How to ignore error and continue infinite stream?

AndroidRx JavaRx Android

Android Problem Overview


I would like to know how to ignore exceptions and continue infinite stream (in my case stream of locations)?

I'm fetching current user position (using Android-ReactiveLocation) and then sending them to my API (using Retrofit).

In my case, when exception occurs during network call (e.g. timeout) onError method is invoked and stream stops itself. How to avoid it?

Activity:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}

Android Solutions


Solution 1 - Android

You may want to use one of the error handling operators.

  • onErrorResumeNext( ) — instructs an Observable to emit a sequence of items if it encounters an error
  • onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error
  • onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
  • retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
  • retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

Especialy retry and onExceptionResumeNext look promising in your case.

Solution 2 - Android

mRestService.postLocations(locations) emit one item, then complete. If an error occur, then it emit the error, which complete the stream.

As you call this method in a flatMap, the error continue to your "main" stream, and then your stream stops.

What you can do is to transform your error into another item (as described here : https://stackoverflow.com/a/28971140/476690 ), but not on your main stream (as I presume you already tried) but on the mRestService.postLocations(locations).

This way, this call will emit an error, that will be transformed to an item/another observable and then complete. (without calling onError).

On a consumer view, mRestService.postLocations(locations) will emit one item, then complete, like if everything succeed.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();

Solution 3 - Android

If you just want to ignore the error inside the flatMap without returning an element do this:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);

Solution 4 - Android

Just pasting the link info from @MikeN's answer incase it gets lost:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;
    
    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }
            
        };
    }
}

> and use it close to the observable source because other operators may > eagerly unsubscribe before that.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

> Note, however, that this suppresses the error delivery from the > source. If any onNext in the chain after it throws an exception, it is > still likely the source will be unsubscribed.

Solution 5 - Android

This answer might be a bit late, but if anyone stumbles upon this, instead of reinventing a wheel can use the ready to use Relay lib by Jacke Wharton

https://github.com/JakeWharton/RxRelay

there is good documentation but in essence, Relay is A Subject except without the ability to call onComplete or onError.

and the options are:

BehaviorRelay

Relay that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
    // observer will receive all events.
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.subscribe(observer);
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");

    // observer will receive the "one", "two" and "three" events, but not "zero"
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.accept("zero");
    relay.accept("one");
    relay.subscribe(observer);
    relay.accept("two");
    relay.accept("three");

PublishRelay Relay that, once an Observer has subscribed, emits all subsequently observed items to the subscriber.

    PublishRelay<Object> relay = PublishRelay.create();
    // observer1 will receive all events
    relay.subscribe(observer1);
    relay.accept("one");
    relay.accept("two");
    // observer2 will only receive "three"
    relay.subscribe(observer2);
    relay.accept("three");

ReplayRelay Relay that buffers all items it observes and replays them to any Observer that subscribes.

    ReplayRelay<Object> relay = ReplayRelay.create();
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");
    // both of the following will get the events from above
    relay.subscribe(observer1);
    relay.subscribe(observer2);

Solution 6 - Android

Try calling the rest service in a Observable.defer call. That way for every call you'll get a chance to use its own 'onErrorResumeNext' and the errors won't cause your main stream to complete.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

That solution is originally from this thread -> https://stackoverflow.com/questions/28390878/rxjava-observable-and-subscriber-for-skipping-exception, but I think it will work in your case too.

Solution 7 - Android

Add my solution for this problem:

privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };

Solution 8 - Android

A slight modification of the solution (@MikeN) to enable finite streams to complete:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

Solution 9 - Android

Here is my kotlin extension function for ignoring errors

fun <T> Observable<T>.ignoreErrors(errorHandler: (Throwable) -> Unit) =
    retryWhen { errors ->
        errors
            .doOnNext { errorHandler(it) }
            .map { 0 }
    }

This utilizes retryWhen to indefinately re-subscribe to the upstream while still allowing you a method of handling the error in a non terminal way.

This feels dangerous

Solution 10 - Android

With Rxjava2, we could call the overloaded flatmap with delayErrors parameter: flatmap javadoc

When passing it as true:

> exceptions from the current Flowable and all inner Publishers are delayed until all of them terminate if false, the first one signaling an exception will terminate the whole sequence immediately

Solution 11 - Android

You can just skip error using onErrorComplete() method

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
    .buffer(50)
    .flatMapMaybe(locations -> Maybe.just(mRestService.postLocations(locations).onErrorComplete()) // skip item
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionZiemView Question on Stackoverflow
Solution 1 - AndroidtomrozbView Answer on Stackoverflow
Solution 2 - AndroiddwursteisenView Answer on Stackoverflow
Solution 3 - AndroidAlbert Vila CalvoView Answer on Stackoverflow
Solution 4 - AndroidAllDayAmazingView Answer on Stackoverflow
Solution 5 - Androidbastami82View Answer on Stackoverflow
Solution 6 - AndroidmeddleView Answer on Stackoverflow
Solution 7 - AndroidHotIceCreamView Answer on Stackoverflow
Solution 8 - AndroidportenezView Answer on Stackoverflow
Solution 9 - AndroidWilliam ReedView Answer on Stackoverflow
Solution 10 - AndroidYing CherryView Answer on Stackoverflow
Solution 11 - AndroidFranzFView Answer on Stackoverflow