RxJava Operators: Merging

November 6, 2018

Operators for merging

It is possible to merge multiple Observables into one single Observable. This new Observable will subscribe to all of it’s sources simultaneously. It is possible to merge both finite and infinite Observables.

merge() and mergeWith()

The operator merge() takes two or more Observable<T> sources and merges them into a single Observable<T>.

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        Observable.merge(mySource1, mySource2)
                  .subscribe(System.out::println);
    }
}

It is possible to use the mergeWith() operator as well:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        mySource1.mergeWith(mySource2)
                 .subscribe(System.out::println);
    }
}

These two operators will subscribe to all sources simultaneously. This means that they will likely to emit the items in order only if they are cold observables and they are on the same thread.
This means that even that the order of the emissions from each source Observable is maintained, you should not rely on the ordering of the source Observables themselves when using merge operators:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        mySource1.subscribeOn(Schedulers.newThread())
                 .mergeWith(mySource2)
                 .subscribe(System.out::println);

        Thread.sleep(1000);
    }
}

It is possible to pass multiple sources as a Collection:

import io.reactivex.Observable;

import java.util.Arrays;

public class Main {

    public static void main(String[] args) {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        Observable.merge(Arrays.asList(mySource1, mySource2))
                 .subscribe(System.out::println);
    }
}

It is also possible to merge infinite sources. It is even possible to merge finite and infinite sources as well:

import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        Observable<String> myInfiniteSource1 = Observable.interval(1, TimeUnit.SECONDS)
                                                         .map(x -> x + 1)
                                                         .map(x -> String.format("Infinite Source1: '%s' seconds passed", x));

        Observable<String> myInfiniteSource2 = Observable.interval(400, TimeUnit.MILLISECONDS)
                                                         .map(x -> (x + 1) * 400)
                                                         .map(x -> String.format("Infinite Source2: '%s' miliseconds passed", x));

        Observable.merge(myInfiniteSource1, mySource1, myInfiniteSource2, mySource2)
                  .subscribe(System.out::println);

        Thread.sleep(5500);
    }
}
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami