RxJava delay for each item of list emitted

JavaRx JavaDelay

Java Problem Overview


I'm struggling to implement something I assumed would be fairly simple in Rx.

I have a list of items, and I want to have each item emitted with a delay.

It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.

Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

The result is:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

But what I would expect to see is something like this:

174ms
[5]
 
230ms
[2]
 
285ms
[1]
 
345ms
[3]
 
399ms
[4]
 

What am I doing wrong?

Java Solutions


Solution 1 - Java

The simplest way to do this seems to be just using concatMap and wrapping each item in a delayed Obserable.

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

Prints:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

Solution 2 - Java

One way to do it is to use zip to combine your observable with an Interval observable to delay the output.

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

Solution 3 - Java

Just sharing a simple approach to emit each item in a collection with an interval:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

Each item will be emitted every 500 milliseconds

Solution 4 - Java

For kotlin users, I wrote an extension function for the 'zip with interval' approach

import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit

fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
    Observable.zip(
        this, 
        Observable.interval(interval, timeUnit), 
        BiFunction { item, _ -> item }
    )

It works the same way, but this makes it reusable. Example:

Observable.range(1, 5)
    .delayEach(1, TimeUnit.SECONDS)

Solution 5 - Java

I think it's exactly what you need. Take look:

long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                .timestamp(TimeUnit.MILLISECONDS)
                .subscribe(emitTime -> {
                    System.out.println(emitTime.time() - startTime);
                });

Solution 6 - Java

To introduce delay between each item emitted is useful:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));

Observable.fromIterable(letters)
                .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                        .take(1)
                        .map(second -> item))
                .subscribe(System.out::println);

More good options at https://github.com/ReactiveX/RxJava/issues/3505

Solution 7 - Java

You can implement a [custom rx operator] 2 such as MinRegularIntervalDelayOperator and then use this with the lift function

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

Solution 8 - Java

Observable.just("A", "B", "C", "D", "E", "F")
    .flatMap { item -> Thread.sleep(2000)
        Observable.just( item ) }
    .subscribe { println( it ) }

Solution 9 - Java

To delay each group you can change your flatMap() to return an Observable that delays emitting the group.

Observable
        .range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g ->
                Observable
                        .timer(50, TimeUnit.MILLISECONDS)
                        .flatMap(t -> g.toList())
        )
        .doOnNext(item -> {
            System.out.println(System.currentTimeMillis() - timeNow);
            System.out.println(item);
            System.out.println(" ");
        }).toList().toBlocking().first();

Solution 10 - Java

A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.

Observable.range(1, 5)
            .delay(n -> n*50)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

Solution 11 - Java

There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.

here what i have tried.

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

Solution 12 - Java

You can use

   Observable.interval(1, TimeUnit.SECONDS)
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return aLong.intValue() + 1;
                }
            })
            .startWith(0)
            .take(listInput.size())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer index) throws Exception {
                    Log.d(TAG, "---index of your list --" + index);
                }
            });

This code above not duplicate value(index). "I'm sure"

Solution 13 - Java

A Swift extension for both approaches suggested in this post.

Concat

import RxSwift

extension Observable {
    public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
    }
}

Zip

import RxSwift

extension Observable {
    public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
        return Observable.zip(
            Observable<Int>.interval(period, scheduler: scheduler),
            self
        ) { $1 }
    }
}

Usage

Observable.range(start: 1, count: 5)
    .delayEach(.seconds(1), scheduler: MainScheduler.instance)

My personal preference goes out to the concat approach since it will also work as expected when the upstream emits items at a slower rate than the delay interval.

And yes the original post is RxJava specific, but Google also brings you here for RxSwift queries.

Solution 14 - Java

Regarding eis' comment "wonder why isn't any of the answers actually answering the question. Why isn't this working, what's wrong with it?":

It's behaving differently than expected because delaying an item means its emission time is delayed relative to the time the item would otherwise be emitted - not relative to the previous item.

Imagine the OP's observable without any delay: All items are emitted in quick succession (in the same millisecond). With delay, each item is emitted later. But since the same delay is applied to each item, their relative emission times do not change. They are still emitted in one millisecond.

Think of a person entering the room at 14:00. Another person enters at 14:01. If you apply a delay of one hour to both, they enter at 15:00 and 15:01. There is still just one minute between them.

Solution 15 - Java

I think you want this:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.

Solution 16 - Java

You can add a delay between emitted items by using flatMap, maxConcurrent and delay()

Here is an example - emit 0..4 with delay

@Test
fun testEmitWithDelays() {
    val DELAY = 500L
    val COUNT = 5

    val latch = CountDownLatch(1)
    val startMoment = System.currentTimeMillis()
    var endMoment : Long = 0

    Observable
        .range(0, COUNT)
        .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
        .subscribe(
                { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                {},
                {
                    endMoment = System.currentTimeMillis()
                    latch.countDown()
                })

    latch.await()

    assertTrue { endMoment - startMoment >= DELAY * COUNT }
}

... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

Solution 17 - Java

you should be able to achieve this by using Timer operator. I tried with delay but couldn't achieve the desired output. Note nested operations done in flatmap operator.

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionathorView Question on Stackoverflow
Solution 1 - JavaMagnusView Answer on Stackoverflow
Solution 2 - JavaiagreenView Answer on Stackoverflow
Solution 3 - JavaMina WissaView Answer on Stackoverflow
Solution 4 - JavaTimView Answer on Stackoverflow
Solution 5 - JavaVladView Answer on Stackoverflow
Solution 6 - JavayaircarrenoView Answer on Stackoverflow
Solution 7 - JavaMatias Irland TomasView Answer on Stackoverflow
Solution 8 - JavaVairavanView Answer on Stackoverflow
Solution 9 - JavakjonesView Answer on Stackoverflow
Solution 10 - JavaTushar NallanView Answer on Stackoverflow
Solution 11 - JavaSanket KachhelaView Answer on Stackoverflow
Solution 12 - JavaDuy PhanView Answer on Stackoverflow
Solution 13 - JavaHouwertView Answer on Stackoverflow
Solution 14 - JavaDabblerView Answer on Stackoverflow
Solution 15 - JavaFriendlyMikhailView Answer on Stackoverflow
Solution 16 - JavacVoroninView Answer on Stackoverflow
Solution 17 - JavaAbhishek BansalView Answer on Stackoverflow