RxJava delay for each item of list emitted
JavaRx JavaDelayJava Problem Overview
I'm struggling to implement something I assumed would be fairly simple in Rx.
I have a list of items, and I want to have each item emitted with a delay.
It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.
Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
The result is:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
But what I would expect to see is something like this:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
What am I doing wrong?
Java Solutions
Solution 1 - Java
The simplest way to do this seems to be just using concatMap
and wrapping each item in a delayed Obserable.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Prints:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Solution 2 - Java
One way to do it is to use zip
to combine your observable with an Interval
observable to delay the output.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Solution 3 - Java
Just sharing a simple approach to emit each item in a collection with an interval:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Each item will be emitted every 500 milliseconds
Solution 4 - Java
For kotlin users, I wrote an extension function for the 'zip with interval' approach
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
It works the same way, but this makes it reusable. Example:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
Solution 5 - Java
I think it's exactly what you need. Take look:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
Solution 6 - Java
To introduce delay between each item emitted is useful:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
More good options at https://github.com/ReactiveX/RxJava/issues/3505
Solution 7 - Java
You can implement a [custom rx operator] 2 such as MinRegularIntervalDelayOperator
and then use this with the lift
function
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Solution 8 - Java
Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }
Solution 9 - Java
To delay each group you can change your flatMap()
to return an Observable that delays emitting the group.
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Solution 10 - Java
A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Solution 11 - Java
There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.
here what i have tried.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Solution 12 - Java
You can use
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});
This code above not duplicate value(index). "I'm sure"
Solution 13 - Java
A Swift extension for both approaches suggested in this post.
Concat
import RxSwift
extension Observable {
public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
}
}
Zip
import RxSwift
extension Observable {
public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
return Observable.zip(
Observable<Int>.interval(period, scheduler: scheduler),
self
) { $1 }
}
}
Usage
Observable.range(start: 1, count: 5)
.delayEach(.seconds(1), scheduler: MainScheduler.instance)
My personal preference goes out to the concat approach since it will also work as expected when the upstream emits items at a slower rate than the delay interval.
And yes the original post is RxJava specific, but Google also brings you here for RxSwift queries.
Solution 14 - Java
Regarding eis' comment "wonder why isn't any of the answers actually answering the question. Why isn't this working, what's wrong with it?":
It's behaving differently than expected because delaying an item means its emission time is delayed relative to the time the item would otherwise be emitted - not relative to the previous item.
Imagine the OP's observable without any delay: All items are emitted in quick succession (in the same millisecond). With delay, each item is emitted later. But since the same delay is applied to each item, their relative emission times do not change. They are still emitted in one millisecond.
Think of a person entering the room at 14:00. Another person enters at 14:01. If you apply a delay of one hour to both, they enter at 15:00 and 15:01. There is still just one minute between them.
Solution 15 - Java
I think you want this:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.
Solution 16 - Java
You can add a delay between emitted items by using flatMap, maxConcurrent and delay()
Here is an example - emit 0..4 with delay
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
Solution 17 - Java
you should be able to achieve this by using Timer
operator. I tried with delay
but couldn't achieve the desired output. Note nested operations done in flatmap
operator.
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));