RxJava Multicasting

November 15, 2018

Multicasting

Multicasting means consolidating the stream of emissions for all Observers to be the same.
In the post about hot and cold Observables, we have already seen an example of multicasting:

import io.reactivex.observables.ConnectableObservable;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        ConnectableObservable<String> myObservable = Observable.just("A", "B", "C", "D", "E").publish();

        myObservable.subscribe(s -> System.out.println("Observer 1: " + s));
        myObservable.subscribe(s -> System.out.println("Observer 2: " + s));

        myObservable.connect();
    }
}

The ConnectableObservable forces the emissions from the source to become hot, sending the emitted items to all Observers at the same time.

Multicasting behaviour

To understand how multicasting works when other operators are used, let us consider this example application: the Observable will generate 3 random numbers which should be multicasted to a two Observers.

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

import java.util.concurrent.ThreadLocalRandom;

public class Main {

    public static void main(String[] args)  {
        ConnectableObservable<Integer> mySource = Observable.range(1, 3)
                                                            .map(x -> ThreadLocalRandom.current().nextInt(1, 100))
                                                            .publish();

        mySource.subscribe(x -> System.out.println(String.format("Observer 1: %d", x)));

        mySource.subscribe(x -> System.out.println(String.format("Observer 2: %d", x)));

        mySource.connect();
    }
}

One thing worth noting is that the order of the operators is important. If the publish() would be called first and then we would call map(), then the multicasting would happen before the map() operator. The emissions from Observable.range() would be consolidated into one stream of emissions, but then each Observable would get a different stream at map().
Everything before publish() is consolidated into a single stream of emissions, but after publish(), it will fork into separate streams for each Observer.

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

import java.util.concurrent.ThreadLocalRandom;

public class Main {

    public static void main(String[] args)  {
        ConnectableObservable<Integer> range = Observable.range(1, 3)
                                                         .publish();

        Observable<Integer> mySource = range.map(x -> ThreadLocalRandom.current().nextInt(1, 100));


        mySource.subscribe(x -> System.out.println(String.format("Observer 1: %d", x)));

        mySource.subscribe(x -> System.out.println(String.format("Observer 2: %d", x)));

        range.connect();
    }
}

Benefits of Multicasting

Multicasting is a good tool to avoid redundant, resource intensive work being done for multiple Observers.
One the other hand, making Observables hot means that the timing of calling the connect() method is critical: Observers might potentially miss the emitted items.
As best practice, it is worth finding the point in the operator chain where multicasting can be used and the emitted items can be consolidated into a single stream of items. This point is usually where the Observers have common operators and potentially start to diverge.
For example if both Observers are interested in the sum of the random numbers:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

import java.util.concurrent.ThreadLocalRandom;

public class Main {

    public static void main(String[] args)  {
        ConnectableObservable<Integer> mySource = Observable.range(1, 3)
                                                            .map(x -> ThreadLocalRandom.current().nextInt(1, 100))
                                                            .reduce(0, (total, nextNumber) -> total + nextNumber)
                                                            .flatMapObservable(x -> ConnectableObservable.just(x))
                                                            .publish();

        mySource.subscribe(x -> System.out.println(String.format("Observer 1: %d", x)));

        mySource.subscribe(x -> System.out.println(String.format("Observer 2: %d", x)));

        mySource.connect();
    }
}

On the other hand if one Observable must calculate the sum and the other must print out the items, they fork into different streams after the generation of the random integers:

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

import java.util.concurrent.ThreadLocalRandom;

public class Main {

    public static void main(String[] args)  {
        ConnectableObservable<Integer> mySource = Observable.range(1, 3)
                                                            .map(x -> ThreadLocalRandom.current().nextInt(1, 100))
                                                            .publish();

        mySource.reduce(0, (total, nextNumber) -> total + nextNumber)
                .subscribe(x -> System.out.println(String.format("Observer 1: %d", x)));

        mySource.subscribe(x -> System.out.println(String.format("Observer 2: %d", x)));

        mySource.connect();
    }
}
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami