RxJava Operators: flatMap()

November 7, 2018

flatMap()

The flatMap() operator is somewhat similar to the map() operator: the map() operator transforms the emitted items into other items, the flatMap() operators transforms the items into Observables.
It performs this by performing a merge() operation (this is the flattening part) on the item after wrapping it into an Observable, thus creating a single stream.
One interesting feature of flatMap() is that it can change the number of emissions. In the example below, if we encounter the character ‘a’, we change it into uppercase and wrap it into an Observable, but for all other characters, we create three Observables out of them:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> mySource1 = Observable.just("a", "b", "c");

        mySource1.flatMap(x -> {
                    if("a".equals(x)){
                        return Observable.just(x.toUpperCase());
                    } else {
                        return Observable.fromArray(x.repeat(3).split(""));
                    }
                 })
                 .subscribe(System.out::println);
    }
}

One thing worth noting is that since flatMap() returns an Observable, it is possible to leverage asynchronous and multi threaded processing. Keep in mind that in such cases, the ordering of the items might be changed:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import static java.util.Map.entry;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> mySource1 = Observable.just("a", "b", "c", "d", "e", "f");

        var map = Map.ofEntries(
                    entry("b", 2),
                    entry("c", 3),
                    entry("d", 4),
                    entry("e", 5),
                    entry("f", 6)
                  );

        mySource1.flatMap(x -> {
                    if("a".equals(x)){
                        return Observable.just(x.toUpperCase());
                    } else {
                        return Observable.just(map.get(x))
                                         .subscribeOn(Schedulers.newThread());
                    }
                 })
                 .subscribe(System.out::println);

        Thread.sleep(500);
    }
}
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami