rxjava: Can I use retry() but with delay?

JavaRx Java

Java Problem Overview


I am using rxjava in my Android app to handle network requests asynchronously. Now I would like to retry a failed network request only after a certain time has passed.

Is there any way to use retry() on an Observable but to retry only after a certain delay?

Is there a way to let the Observable know that is is currently being retried (as opposed to tried for the first time)?

I had a look at debounce()/throttleWithTimeout() but they seem to be doing something different.

Edit:

I think I found one way to do it, but I'd be interested in either confirmation that this is the correct way to do it or for other, better ways.

What I am doing is this: In the call() method of my Observable.OnSubscribe, before I call the Subscribers onError() method, I simply let the Thread sleep for the desired amount of time. So, to retry every 1000 milliseconds, I do something like this:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
	    Log.d(TAG, "trying to load all products with pid: " + pid);
	    subscriber.onNext(productClient.getProductNodesForParentId(pid));
	    subscriber.onCompleted();
    } catch (Exception e) {
	    try {
		    Thread.sleep(1000);
	    } catch (InterruptedException e1) {
		    e.printStackTrace();
	    }
	    subscriber.onError(e);
    }
}

Since this method is running on an IO thread anyway it does not block the UI. The only problem I can see is that even the first error is reported with delay so the delay is there even if there's no retry(). I'd like it better if the delay wasn't applied after an error but instead before a retry (but not before the first try, obviously).

Java Solutions


Solution 1 - Java

You can use the retryWhen() operator to add retry logic to any Observable.

The following class contains the retry logic:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}
RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

Usage:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

Solution 2 - Java

Inspired by Paul's answer, and if you are not concerned with retryWhen problems stated by Abhijit Sarkar, the simplest way to delay resubscription with rxJava2 unconditionnaly is :

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

You may want to see more samples and explanations on retryWhen and repeatWhen.

Solution 3 - Java

This example works with jxjava 2.2.2:

Retry without delay:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

Retry with delay:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

Our source single fails if someConnection.send() fails. When that happens, the observable of failures inside retryWhen emits the error. We delay that emission by 300ms and send it back to signal a retry. take(5) guarantees that our signaling observable will terminate after we receive five errors. retryWhen sees the termination and doesn't retry after the fifth failure.

Solution 4 - Java

This is a solution based on Ben Christensen's snippets I saw, RetryWhen Example, and RetryWhenTestsConditional (I had to change n.getThrowable() to n for it to work). I used evant/gradle-retrolambda to make the lambda notation work on Android, but you don't have to use lambdas (although it's highly recommended). For the delay I implemented exponential back-off, but you can plug in what ever backoff logic you want there. For completeness I added the subscribeOn and observeOn operators. I'm using ReactiveX/RxAndroid for the AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

Solution 5 - Java

instead of using MyRequestObservable.retry I use a wrapper function retryObservable(MyRequestObservable, retrycount, seconds) which return a new Observable that handle the indirection for the delay so I can do

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
		final Observable<T> requestObservable, final int nbRetry,
		final long seconds) {

	return Observable.create(new Observable.OnSubscribe<T>() {

		@Override
		public void call(final Subscriber<? super T> subscriber) {
			requestObservable.subscribe(new Action1<T>() {

				@Override
				public void call(T arg0) {
					subscriber.onNext(arg0);
					subscriber.onCompleted();
				}
			},

			new Action1<Throwable>() {
				@Override
				public void call(Throwable error) {

					if (nbRetry > 0) {
						Observable.just(requestObservable)
								.delay(seconds, TimeUnit.SECONDS)
								.observeOn(mainThread())
								.subscribe(new Action1<Observable<T>>(){
									@Override
									public void call(Observable<T> observable){
										retryObservable(observable,
												nbRetry - 1, seconds)
												.subscribe(subscriber);
									}
								});
					} else {
						// still fail after retries
						subscriber.onError(error);
					}

				}
			});

		}

	});

}

Solution 6 - Java

Based on kjones answer here is Kotlin version of RxJava 2.x retry with a delay as an extension. Replace Observable to create the same extension for Flowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

Then just use it on observable observable.retryWithDelay(3, 1000)

Solution 7 - Java

retryWhen is a complicated, perhaps even buggy, operator. The official doc and at least one answer here use range operator, which will fail if there are no retries to be made. See my discussion with ReactiveX member David Karnok.

I improved upon kjones' answer by changing flatMap to concatMap and by adding a RetryDelayStrategy class. flatMap doesn't preserve order of emission while concatMap does, which is important for delays with back-off. The RetryDelayStrategy, as the name indicates, let's the user choose from various modes of generating retry delays, including back-off. The code is available on my GitHub complete with the following test cases:

  1. Succeeds on 1st attempt (no retries)
  2. Fails after 1 retry
  3. Attempts to retry 3 times but succeeds on 2nd hence doesn't retry 3rd time
  4. Succeeds on 3rd retry

See setRandomJokes method.

Solution 8 - Java

Same answer as from kjones but updated to latest version For RxJava 2.x version: ('io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

Usage:

// Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds.

observable
    .retryWhen(new RetryWithDelay(3, 2000));

Solution 9 - Java

Now with RxJava version 1.0+ you can use zipWith to achieve retry with delay.

Adding modifications to kjones answer.

Modified

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}

Solution 10 - Java

You can add a delay in the Observable returned in the retryWhen Operator

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

You can see more examples here. https://github.com/politrons/reactive

Solution 11 - Java

Simply do it like this:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });

Solution 12 - Java

For Kotlin & RxJava1 version

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}

Solution 13 - Java

(Kotlin) I little bit improved code with exponential backoff and applied defense emitting of Observable.range():

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}

Solution 14 - Java

in the event when you need to print out the retry count, you can use the example provided in Rxjava's wiki page https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));

Solution 15 - Java

Use retryWhen

     /**
     * Retry Handler Support
     * @param errors
     * @param predicate filter error 
     * @param maxTry
     * @param periodStrategy
     * @param timeUnit
     * @return 
     */
    private  Flowable<?> retrySupport(Flowable<Throwable> errors, Predicate<? super Throwable> predicate , Integer maxTry , Function<Long, Long> periodStrategy , TimeUnit timeUnit )
    {
        LongAdder errorCount = new LongAdder();
        return errors
                .doOnNext(e -> {
                    errorCount.increment();
                    long currentCount = errorCount.longValue();
                    boolean tryContinue = predicate.test(e) && currentCount < maxTry;
                    Logger.i("No. of errors: %d , %s",  currentCount,
                            tryContinue ? String.format("please wait %d %s.", periodStrategy.apply(currentCount), timeUnit.name()) : "skip and throw");
                    if(!tryContinue)
                        throw  e;
                } )
                .flatMapSingle(e -> Single.timer( periodStrategy.apply(errorCount.longValue()), timeUnit));
    }

Sample

    private Single<DeviceInfo> getErrorToken( String device)
    {
        return Single.error(  new IOException( "network is disconnect!" ) );
    }

//only retry when emit IOExpcetion
//delay 1s,2s,4s,8s,16s

this.getErrorToken( this.deviceCode )
     .retryWhen( error -> retrySupport( error, 
                 e-> e instanceof IOException,
                 5 , 
                 count-> (long)Math.pow(2,count-1),TimeUnit.SECONDS ) )
     .subscribe( deviceInfo1 -> Logger.i( "----Get Device Info---" ) ,
                 e -> Logger.e( e, "On Error" ) ,
                 () -> Logger.i("<<<<<no more>>>>>"));

Solution 16 - Java

Worked from me with

//retry with retryCount time after 1 sec of delay
observable.retryWhen(throwableFlowable -> {
                return throwableFlowable.take(retryCount).delay(1, TimeUnit.SECONDS);
            });

Solution 17 - Java

I'm a bit too late for this one, but just in case this could still be useful for someone, I created a Kotlin extension function for RxJava 2 that will retry with an exponential backoff strategy:

  private fun <T> Observable<T>.retryWithExponentialBackoff(): Observable<T> {
    val retriesSubject = BehaviorSubject.createDefault(0)
    return doOnNext { retriesSubject.onNext(0) }
        .retryWhen {
          it.withLatestFrom(retriesSubject) { _, retryCount ->
            retriesSubject.onNext(retryCount + 1)
            retryCount
          }.flatMap { retryCount ->
            when (retryCount) {
              MAX_RETRY_COUNT -> Observable.error(RuntimeException("Max number of retries reached"))
              else -> Observable.timer(2.0.pow(retryCount).toLong(), SECONDS)
            }
          }
        }
  }

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questiondavid.miholaView Question on Stackoverflow
Solution 1 - JavakjonesView Answer on Stackoverflow
Solution 2 - JavaMcXView Answer on Stackoverflow
Solution 3 - JavaErunafailaroView Answer on Stackoverflow
Solution 4 - Javadavid-hozeView Answer on Stackoverflow
Solution 5 - JavaAlexis ContourView Answer on Stackoverflow
Solution 6 - JavaJuliusScriptView Answer on Stackoverflow
Solution 7 - JavaAbhijit SarkarView Answer on Stackoverflow
Solution 8 - JavaMihuilkView Answer on Stackoverflow
Solution 9 - JavaOmkarView Answer on Stackoverflow
Solution 10 - JavapaulView Answer on Stackoverflow
Solution 11 - JavaAllen VorkView Answer on Stackoverflow
Solution 12 - JavaCodyView Answer on Stackoverflow
Solution 13 - JavaultraonView Answer on Stackoverflow
Solution 14 - JavaAngel KohView Answer on Stackoverflow
Solution 15 - JavasoapguView Answer on Stackoverflow
Solution 16 - JavaMohammedYakub MoriswalaView Answer on Stackoverflow
Solution 17 - JavaJorge Cloquell RiberaView Answer on Stackoverflow