Differences between RxJava’s Observables and Java’s Streams

October 8, 2018

Stream processing libraries such as RxJava’s Observables and Java’s Streams are quite similar in their APIs for pipeline building.
The main differences are how they handle multi-threading and the composition of said pipelines.

Main differences

  • Streams are pull-based, Observables are push-based.
  • Streams can be used only once, Observables can be subscribed to multiple times.
  • Using a custom thread pool with Stream.parallel() is a bit complicated and requires a trick:
    ForkJoinPool forkJoinPool = new ForkJoinPool(2);
    
    forkJoinPool.submit(() ->
      IntStream.range(1, 1_000).parallel().filter(i -> i % 75 == 0)
    ).get()
     .forEach(System.out::println);

    This trick is based on the ForkJoinTask.fork() method:
    “Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()”

    On the other hand, most RxJava methods accept an optional Scheduler:

    public class Main {
     public static void main(String[] args) throws InterruptedException {
    
      //Create input for the Observable
      Integer[] integers = IntStream.range(1, 1_000).boxed().toArray(Integer[]::new);
      Observable<Integer> myObservable = Observable.fromArray(integers);
    
      //Start a new thread for each work unit
      myObservable
        .subscribeOn(Schedulers.newThread())
        .filter(i -> i % 75 == 0)
        .subscribe(System.out::println);
    
      //Since this is an asynchronous execution above, we need to wait in the main thread, otherwise the application will exit and we will not see the output
      Thread.sleep(3000);
     }
    }

    Because all Streams in a given JVM use the same fork-join pool, unless you use the trick shown above, Stream.parallel() could affect another module in your application.

  • Stream.parallel() splits the stream into partitions to leverage the fork-join pool. Observable.subscribeOn() do not: they move the execution to another thread, but do not split the sequence.
    With Stream.parallel() processing may spread over several threads, with Observable.subscribeOn the number of threads can be controlled (see example above)
  • Streams lack the fine grained control of time-related operations like Observable.timeInterval() or Observable.window().
    The reason for this is that Observables are not pull-based.
András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami