Disposing

October 14, 2018

After calling the subscribe() method on an Observable to start receiving the emissions, these items pass through the Observable chain.
Finite Observables which call the onComplete(), typically free up the resources they were using for garbage collection.
But if we were using infinite or long runing Observables, we might be in a situation where we want to stop the emissions and dispose everything what is associated to our subscription.
As a best practice, the explicit disposal of subscriptions which are no longer needed is the best to avoid memory leaks.

Disposable

The Disposable interface has two methods:

  • dispose() to stop emissions and free up the resources used for that Observer.
  • isDisposed() to check if it has been disposed already

The methods onNext(), onComplete() and onError() of the Observable class return a Disposable:

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {

        Observable<Long> infiniteObservable = Observable.interval(1, TimeUnit.SECONDS);

        Disposable disposable = infiniteObservable.subscribe(s -> System.out.println(s + " second(s) passed!"));

        Thread.sleep(5500);

        disposable.dispose();
        System.out.println("No more emissions after this!");

        Thread.sleep(3000);
    }
}

Disposable within an Observer

It is possible to implement your own Observer and use custom logic in the onNext(), onComplete() and onError() methods, which all have access to the Disposable.dispose() method:

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class Main {
    public static void main(String[] args) {

        Observable<Long> observable = Observable.rangeLong(0, 10);

        Observer<Long> myCustomObserver = new Observer<Long>() {
            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                this.disposable = disposable;
            }

            @Override
            public void onNext(Long value) {
                System.out.println("Got item: " + value);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
                disposable.dispose();
            }

            @Override
            public void onComplete() {
                System.out.println("Calling dispose() in the Observer!");
                disposable.dispose();
            }
        };

        observable.subscribeWith(myCustomObserver);
    }
}

ResourceObserver

If you do not want to deal with Disposable directly and want RxJava to handle it, you can use the ResourceObserver class.
You can use the add(Disposable resource) method to add resources to the ResourceObserver.
When the dispose() method is called, these resources will be disposed of.
This can be done either by calling the dispose() method in onError() and onComplete() explicitely, or by capturing the Disposable which is returned on subscription and calling using it to free up the resources for garbage collection.

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.ResourceObserver;

public class Main {
    public static void main(String[] args) {

        Observable<Long> observable = Observable.rangeLong(0, 10);

        ResourceObserver<Long> myCustomObserver = new ResourceObserver<Long>() {
            @Override
            public void onNext(Long value) {
                System.out.println("Got item: " + value);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
                dispose();  //we have access to dispose()
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete() was called!");
                dispose();  //we have access to dispose()
            }
        };

        Disposable disposable = observable.subscribeWith(myCustomObserver);
        disposable.dispose();  //disposing can be handled here as well
    }
}
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami