RxJava Operators: filter(), take(), skip()

October 16, 2018

To keep the code as reactive as possible, business logic should be implemented and executing using RxJava operators.
These operators are Observers subscribing to the Observable they are called on: they transform the emissions and emit items downstream to other Observers which might be other operators or a final Observer.

There are many operators in RxJava, in this post we will take a look at filter(), take() and skip().

filter()

The filter method takes a Predicate parameter which will provide a mapping for each emission to a Boolean value. The items which result in false will be filtered.
For example to get the numbers which are even, we could use the filter operator:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.range(0, 10);

        myObservable.filter(x -> x % 2 == 0)
                    .subscribe(System.out::println);
    }
}

take()

The take() operator can be used in two ways.
The first is to emit only the first n items. For example to get only the first 3 emissions:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.range(0, 10);

        myObservable.take(3)
                    .subscribe(System.out::println);
    }
}

If there are less items than what you have specified in the take() operator, you will get all items and then the onComplete() is called:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.range(0, 10);

        myObservable.take(15)
                    .subscribe(System.out::println);
    }
}

This version of the take operator works well if you can specify how many emissions you need. However, if you don’t want to state the exact number of emissions, but take all emissions during a given time period, you can still use take().
For example if you have emissions every 200 miliseconds, you could take the emissions only for 2 seconds:

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<Long> myObservable = Observable.interval(200, TimeUnit.MILLISECONDS);

        myObservable.take(2, TimeUnit.SECONDS)
                    .subscribe(System.out::println);

        Thread.sleep(5500);
    }
}

If you need the last emissions, you could use the takeLast() operator, which works similarly.

skip()

The skip() operator will skip the first n emissions before emitting the other items. You can think about it like an opposite of the take() operator.
For example if you want to skip the first 5 emissions:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<Integer> myObservable = Observable.range(0, 10);

        myObservable.skip(5)
                    .subscribe(System.out::println);
    }
}

Similarly to the take() operator, a time duration can be also used as parameter:

import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Observable<Long> myObservable = Observable.interval(100, TimeUnit.MILLISECONDS);

        myObservable.skip(1, TimeUnit.SECONDS)
                .subscribe(System.out::println);

        Thread.sleep(2000);
    }
}

Also similarly to the take() operator, you can use skipLast() as well.

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami