Limit a stream by a predicate

JavaJava 8Java Stream

Java Problem Overview


Is there a Java 8 stream operation that limits a (potentially infinite) Stream until the first element fails to match a predicate?

In Java 9 we can use takeWhile as in the example below to print all the numbers less than 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

As there is no such operation in Java 8, what's the best way of implementing it in a general way?

Java Solutions


Solution 1 - Java

Operations takeWhile and dropWhile have been added to JDK 9. Your example code

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

will behave exactly as you expect it to when compiled and run under JDK 9.

JDK 9 has been released. It is available for download here: JDK 9 Releases.

Solution 2 - Java

Such an operation ought to be possible with a Java 8 Stream, but it can't necessarily be done efficiently -- for example, you can't necessarily parallelize such an operation, as you have to look at elements in order.

The API doesn't provide an easy way to do it, but what's probably the simplest way is to take Stream.iterator(), wrap the Iterator to have a "take-while" implementation, and then go back to a Spliterator and then a Stream. Or -- maybe -- wrap the Spliterator, though it can't really be split anymore in this implementation.

Here's an untested implementation of takeWhile on a Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

Solution 3 - Java

allMatch() is a short-circuiting function, so you can use it to stop processing. The main disadvantage is that you have to do your test twice: once to see if you should process it, and again to see whether to keep going.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);

Solution 4 - Java

As a follow-up to @StuartMarks answer. My StreamEx library has the takeWhile operation which is compatible with current JDK-9 implementation. When running under JDK-9 it will just delegate to the JDK implementation (via MethodHandle.invokeExact which is really fast). When running under JDK-8, the "polyfill" implementation will be used. So using my library the problem can be solved like this:

IntStreamEx.iterate(1, n -> n + 1)
		   .takeWhile(n -> n < 10)
		   .forEach(System.out::println);

Solution 5 - Java

takeWhile is one of the functions provided by the protonpack library.

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));

Solution 6 - Java

Update: Java 9 Stream now comes with a takeWhile method.

No needs for hacks or other solutions. Just use that!


I am sure this can be greatly improved upon: (someone could make it thread-safe maybe)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);
    
TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

A hack for sure... Not elegant - but it works ~:D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}

Solution 7 - Java

You can use java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

Solution 8 - Java

Actually there are 2 ways to do it in Java 8 without any extra libraries or using Java 9.

If you want to print numbers from 2 to 20 on the console you can do this:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

or

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

The output is in both cases:

2
4
6
8
10
12
14
16
18
20

No one mentioned anyMatch yet. This is the reason for this post.

Solution 9 - Java

This is the source copied from JDK 9 java.util.stream.Stream.takeWhile(Predicate). A little difference in order to work with JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

Solution 10 - Java

Here is a version done on ints - as asked in the question.

Usage:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Here's code for StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
	public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
	{
		return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
	}
	
	private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
	{
		private final PrimitiveIterator.OfInt iterator;
		private final IntPredicate predicate;
		
		public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
		{
			super(Long.MAX_VALUE, IMMUTABLE);
			this.iterator = stream.iterator();
			this.predicate = predicate;
		}

		@Override
		public boolean tryAdvance(IntConsumer action)
		{
			if (iterator.hasNext()) {
				int value = iterator.nextInt();
				if (predicate.test(value)) {
					action.accept(value);
					return true;
				}
			}
			
			return false;
		}
	}
}

Solution 11 - Java

Go to get library AbacusUtil. It provides the exact API you want and more:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Declaration: I'm the developer of AbacusUtil.

Solution 12 - Java

If you know the exact amount of repititions that will be performed, you can do

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);

Solution 13 - Java

	IntStream.iterate(1, n -> n + 1)
	.peek(System.out::println) //it will be executed 9 times
	.filter(n->n>=9)
	.findAny();

instead of peak you can use mapToObj to return final object or message

	IntStream.iterate(1, n -> n + 1)
	.mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
	.filter(message->!message.isEmpty())
	.findAny()
    .ifPresent(System.out::println);

Solution 14 - Java

You can't abort a stream except by a short-circuiting terminal operation, which would leave some stream values unprocessed regardless of their value. But if you just want to avoid operations on a stream you can add a transform and filter to the stream:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }
    
    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

That transforms the stream of things to nulls when the things meet some condition, then filters out nulls. If you're willing to indulge in side effects, you could set the condition value to true once some thing is encountered, so all subsequent things are filtered out regardless of their value. But even if not you can save a lot of (if not quite all) processing by filtering values out of the stream that you don't want to process.

Solution 15 - Java

Even I was having a similar requirement -- invoke the web-service, if it fails, retry it 3 times. If it fails even after these many trials, send an email notification. After googling a lot, anyMatch() came as a saviour. My sample code as follows. In the following example, if webServiceCall method returns true in the first iteration itself, stream does not iterate further as we have called anyMatch(). I believe, this is what you are looking for.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {		
	if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
	boolean bool = ThreadLocalRandom.current().nextBoolean();
	System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
	return bool;
}

Solution 16 - Java

If you have different problem, different solution may be needed but for your current problem, I would simply go with:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

Solution 17 - Java

Might be a bit off topic but this is what we have for List<T> rather than Stream<T>.

First you need to have a take util method. This methods takes first n elements:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

it just works like scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

now it will be fairly simple to write a takeWhile method based on take

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

it works like this:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

this implementation iterate the list partially for a few times but it won't add add O(n^2) operations. Hope that's acceptable.

Solution 18 - Java

I have another quick solution by implementing this (which is rly unclean in fact, but you get the idea):

public static void main(String[] args) {
	System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
			.map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
	Stream<T> terminateOn(T e);
}

static class StreamUtil {
	static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
		return new TerminatedStream<T>() {
			public Stream<T> terminateOn(T e) {
				Builder<T> builder = Stream.<T> builder().add(seed);
				T current = seed;
				while (!current.equals(e)) {
					current = op.apply(current);
					builder.add(current);
				}
				return builder.build();
			}
		};
	}
}

Solution 19 - Java

Here is my attempt using just Java Stream library.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMForsterView Question on Stackoverflow
Solution 1 - JavaStuart MarksView Answer on Stackoverflow
Solution 2 - JavaLouis WassermanView Answer on Stackoverflow
Solution 3 - JavaMichael RowleyView Answer on Stackoverflow
Solution 4 - JavaTagir ValeevView Answer on Stackoverflow
Solution 5 - JavaDominic FoxView Answer on Stackoverflow
Solution 6 - JavaThe CoordinatorView Answer on Stackoverflow
Solution 7 - JavafrhackView Answer on Stackoverflow
Solution 8 - Javagil.fernandesView Answer on Stackoverflow
Solution 9 - JavamartianView Answer on Stackoverflow
Solution 10 - JavaChris GreenawayView Answer on Stackoverflow
Solution 11 - Javauser_3380739View Answer on Stackoverflow
Solution 12 - JavaDilip TharoorView Answer on Stackoverflow
Solution 13 - JavaOleksandr PotomkinView Answer on Stackoverflow
Solution 14 - JavaMatthewView Answer on Stackoverflow
Solution 15 - JavaChinmay PhadkeView Answer on Stackoverflow
Solution 16 - Javakrmanish007View Answer on Stackoverflow
Solution 17 - JavaMaxView Answer on Stackoverflow
Solution 18 - Javauser2504380View Answer on Stackoverflow
Solution 19 - Javaclimbing_bumView Answer on Stackoverflow