RxJava Operators: doOnSubscribe, doOnSuccess(), doOnDispose() and doFinally()

November 5, 2018

doOnSubscribe, doOnDispose() and doFinally()

With the help of the operators doOnSubscribe() and doOnDispose() it is possible to execute code when a subscription and disposal occur:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d");

        myObservable.doOnSubscribe(disposable -> System.out.println("Subscribed!"))
                    .doOnDispose(() -> System.out.println("Disposing!"))
                    .subscribe(System.out::println);
    }
}

In the example above, if you use RxJava version of 2.0.5 or newer, subscribe disconnects the upstream Disposable when it receives a terminal event, so an already completed emission sequence cannot be disposed.
Hence the onDispose() will not be called. What is the solution? Use doFinally():

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d");

        myObservable.doOnSubscribe(disposable -> System.out.println("Subscribed!"))
                    .doFinally(() -> System.out.println("Finally!"))
                    .subscribe(System.out::println);
    }
}

doOnSuccess()

Because Maybe and Single do not have an onNext(), only onSuccess() to pass a single emission, so we cannot use the doOnNext() operator.
Luckily, we have the doOnSuccess() operator:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d");

        myObservable.reduce((concat, next) -> concat + next)
                    .doOnSuccess(x -> System.out.println(String.format("Emitting: '%s'", x)))
                    .subscribe(System.out::println);
    }
}

doOnSuccess() is not called if a Maybe emits no items:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.empty();

        myObservable.firstElement()
                    .doOnSuccess(x -> System.out.println(String.format("Emitting: '%s'", x)))
                    .subscribe(System.out::println);
    }
}
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami