RxJava Operators: delay(), repeat(), repeatWhen()

October 23, 2018

delay()

With the help of the delay() operator emissions can be shifted forward in time.

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.just("a", "b", "c");

        myObservable.delay(2, TimeUnit.SECONDS)
                    .subscribe(System.out::println);

        Thread.sleep(4000);
    }
}

It is possible to delay the error notifications as well, you just need to pass a boolean value as a third parameter.

repeat()

The repeat() operator emits items multiple times after onComplete() was called. In essence, this operator resubscribes when it receives onCompleted().

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c");

        myObservable.repeat(3)
                    .subscribe(System.out::println);
    }
}

repeatWhen()

It is possible to combine repeat() and delay() with the help of the repeatWhen() operator which allows you to specify a custom logic for retries.
For example if you want to repeat indefinitely after a 3 second delay:

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.just("a", "b", "c");

        myObservable.repeatWhen(completed -> completed.delay(3, TimeUnit.SECONDS))
                    .subscribe(System.out::println);

        Thread.sleep(6000);
    }
}

In case you want to retry for a finite number of times:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.just("a", "b", "c");

        myObservable.repeatWhen(completed -> completed.zipWith(Observable.range(1, 3), (n, i) -> i))
                    .subscribe(System.out::println);

        Thread.sleep(3000);
    }
}

Of course you can combine this with a delay to get increasingly greater delays between retries:

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<String> myObservable = Observable.just("a", "b", "c");

        myObservable.repeatWhen(completed -> completed.zipWith(Observable.range(1, 3), (n, i) -> i)
                                                      .flatMap(retryCount -> Observable.timer(retryCount, TimeUnit.SECONDS)))
                    .subscribe(System.out::println);

        Thread.sleep(8000);
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami