RxJava Observables Intro – create(), just(), fromIterable(), fromArray()

October 9, 2018

Observables

Observables are push-based iterators. They push items (called emissions) through a series of operators, until these items arrive to a final Observer, which will consume the items.
The three most important methods when using an Observable are the following

  • onNext(): this method passes each item, one at a time from a given source to the Observer
  • onComplete(): when the communication is completed, then this will inform the Observer that no more onNext() methods will occur
  • onError(): when an error happens, this will inform the Observer and it can decide how to handle it

How to create an Observable with create()

There are multiple ways to create an observable. In a previous post we have seen the just() method, but to get a better understanding of Observables and how the three methods mentioned above come together, it is useful to get familiar with the create() method.
We are going to use the factory Observable.create(), by passing a Lambda to represent the emitter. On this emitter we are going to call the onNext() to pass emissions, then at the end to signal the completion of the communication, we call the onComplete().

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.create(emitter -> {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
            emitter.onNext("D");
            emitter.onNext("E");
            emitter.onComplete();
        });

        myObservable.subscribe(System.out::println);
    }
}

An Observable can be infinite, this just means that the onComplete() is never called.

Short introduction to onError()

The Observer might handle the exceptions, as you can see it in the example below, handling it in a way that we all know it shouldn’t, but we tend to do it anyway 🙂
So without further ado, here is a error handling in the Observer with a printStackTrace():

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.create(emitter -> {
            try {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onNext("D");
                emitter.onNext("E");
                throw new Exception("This is an error from the Observable!");
            } catch (Exception e) {
                emitter.onError(e);
            }
        });

        myObservable.subscribe(System.out::println, Throwable::printStackTrace);
    }
}

How to create an Observable with just()

Observable.create() is not used too often, because there are more comfortable factory methods available. One such of these methods is the just() method we have already seen in a previous post.

How to create an Observable with fromIterable() and fromArray()

With the help of Observable.fromIterable() we can emit items from any Iterable, such as Lists. This method takes care of calling the onNext() and onComplete() methods for us:

import io.reactivex.Observable;
import java.util.Arrays;

public class Main {
    public static void main(String[] args) {
        Observable<String> myObservable = Observable.fromIterable(Arrays.asList("A", "B", "C", "D", "E"));

        myObservable.subscribe(System.out::println, Throwable::printStackTrace);
    }
}

Similarly, we could use the fromArray() as well:

import io.reactivex.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<String> myObservable = Observable.fromArray(new String[]{"A", "B", "C", "D", "E"});

        myObservable.subscribe(System.out::println, Throwable::printStackTrace);
    }
}

The contract

The Observable contract states that the emissions must be passed sequentially and one at a time. This means that emissions cannot be passed by an Observable in a parallel way.
This seems like a limitation, but it simplifies RxJava and we can still leverage concurrency without breaking this contract.

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami