RxJava Operators: Error recovery

October 30, 2018

Operators for error recovery

Errors might occure during the processing of the Observables. When an error occurs the onError() is communicated down the Observable chain to the Observer and then the subscription terminates and no more emissions will occur.
There are cases when we want to handle these exceptions before they get to the Observer, maybe for example to attemt some kind of recovery.

onErrorReturn() and onErrorReturnItem()

When we want to use a default value for recovery, we can use the onErrorReturnItem() operator:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .onErrorReturnItem(-1)
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

The placement of the onErrorReturn() and onErrorReturnItem() matters: if it is placed before the map() operator where the error occurs, we would not catch it. For the interception of an emitted error, these operators must be placed downstream.

It is possible to use onErrorReturn() to provide a lambda and not only a fallback value:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .onErrorReturn(e-> {
                        System.out.println(String.format("An error occured: '%s', falling back to default value", e.getMessage()));
                        return -1;
                    })
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

As you could see, even though the error was handled, the emissions stopped after it. If we don’t want this, we would need to handle the error in the map() operator:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> {
                        try{
                            return 100 / x;
                        } catch(ArithmeticException e) {
                            System.out.println(String.format("An error occured: '%s', falling back to default value", e.getMessage()));
                            return -1;
                        }
                    })
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

onErrorResumeNext()

The onErrorResumeNext() operator is very similar to the previous two operators above: it accepts an Observable as a parameter which can be used to emit multiple values in case of an error.
For example if we quietly want to stop the emissions, we could pass an Observable.empty():

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .onErrorResumeNext(e-> {
                        System.out.println(String.format("An error occured: '%s', using fallback Observable", e.getMessage()));
                        return Observable.empty();
                    })
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

retry(), retryUntil(), retryWhen()

We already used repeatWhen() to retry, but there is a dedicated operator for this: retry() with multiple overloads.
When used without any parameters, it will resubscribe indefinitely on any error:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .retry()
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

It is generally safer to specify a given number of times for the retry() operator:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .retry(2)
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

With the help of the retryUntil() operator it is possible to provide a Lambda which return a Boolean. It will retry until the Lambda returns false:

import io.reactivex.Observable;

import java.util.concurrent.atomic.AtomicInteger;

public class Main {
    public static void main(String[] args) {
        AtomicInteger retryCounter = new AtomicInteger(0);

        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .retryUntil(() -> retryCounter.incrementAndGet() > 3)
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));
    }
}

With the help of the retryWhen() operator it is possible to implement a retry logic with increasing delay:

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Observable<Integer> myObservable = Observable.just(7, 5, 2, 0, 3, 9);

        myObservable.map(x -> 100 / x)
                    .retryWhen(throwableObservable ->
                            throwableObservable.zipWith(Observable.range(1, 3), (throwable, integer) -> integer)
                                               .flatMap(integer -> Observable.timer(integer, TimeUnit.SECONDS))
                    )
                    .subscribe(x -> System.out.println("Received: " + x),
                               e -> System.out.println("Error: " + e));

        Thread.sleep(8000);
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami