RxJava Operators: concat(), concatWith()

November 8, 2018

concat()

The concat() operator is quite similar to merging, but it will not move on to the next Observable until the current one calls onComplete(). This means that the concatenated Observables‘ emissions will be in a guaranteed order.
If we would use the concat() operator on infinite Observables, it would keep emitting from the first infinite Observable forever. Because of this  if you are planning to use an infinite Observable with concat(), the infinite Observable should be the last one or it should be turned into a finite Observable with the help of the take() operator.

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        Observable.concat(mySource1, mySource2)
                  .subscribe(System.out::println);
    }
}

concatWith()

It is also possible to use the concatWith() operator, just like we have seen with merging.
Notice that the order of the emissions is always the same:

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> mySource1 = Observable.just("a", "b", "c");
        Observable<String> mySource2 = Observable.just("x", "y", "z");

        mySource1.subscribeOn(Schedulers.newThread())
                 .concatWith(mySource2)
                 .subscribe(System.out::println);

        Thread.sleep(1000);
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami