RxJava Operators: distinct(), distinctUntilChanged(), elementAt()

October 18, 2018

distinct()

The distinct() operator uses the equals() implementation of the emitted objects to emit all unique emissions.

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "a", "b", "b", "c", "c");

        myObservable.distinct()
                    .subscribe(System.out::println);
    }
}

It is possible to pass a lambda function as parameter to distinct() to select a property of the items which will be used to determine uniqueness:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "aa", "bb", "bbb", "c", "cccc");

        myObservable.distinct(String::length)
                    .subscribe(System.out::println);
    }
}

distinctUntilChanged()

The distinctUntilChanged() operator ignores duplicate consecutive emissions.
This means that it will ignore repetitions of an item until they change: if the same item is being emitted multiple times repeatedly, the duplicates will be ignored, until a new item is emitted. Then the duplicates of this new item wil be ignored until the emitted item changes again.

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "a", "a", "b", "b", "a", "c", "a");

        myObservable.distinctUntilChanged()
                    .subscribe(System.out::println);
    }
}

Just like with distinct(), it is possible to pass a lambda to distinctUntilChanged() as well:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "aa", "aa", "b", "bb", "aa", "aa", "aa");

        myObservable.distinctUntilChanged(String::length)
                    .subscribe(System.out::println);
    }
}

elementAt()

The elementAt() operator can be used to get a specific emission by its index.
The elementAt() emits a Maybe which might or might not contain the item, depending on if the index is a valid index or not.

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d", "e");

        myObservable.elementAt(2)
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess (this will be called)
                               Throwable::printStackTrace,                           //onError
                               () -> System.out.println("Done from elementAt(2)"));  //onComplete

        myObservable.elementAt(42)
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess
                               Throwable::printStackTrace,                           //onError
                               () -> System.out.println("Done from elementAt(42)")); //onComplete (this will be called)
    }
}

If you want to return a default item in case the given index is out of range, you can pass this default item as a second parameter to elementAt():

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d", "e");

        myObservable.elementAt(2)
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess (this will be called)
                               Throwable::printStackTrace,                           //onError
                               () -> System.out.println("Done from elementAt(2)"));  //onComplete

        myObservable.elementAt(42, "<Default Item>")                                 //this returns a Single!
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess (this will be called)
                               Throwable::printStackTrace);                          //onError
    }
}

It is worth noting, that when you pass a default item as a fallback option for the elementAt() operator, it will return a Single.

If you want to raise an error in case the provided index is out of the bounds of the items, you can use the elementAtOrError() operator:

import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable<String> myObservable = Observable.just("a", "b", "c", "d", "e");

        myObservable.elementAtOrError(2)
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess (this will be called)
                               Throwable::printStackTrace);                          //onError


        myObservable.elementAtOrError(42)
                    .subscribe(x -> System.out.println("Element: " + x),             //onSuccess
                               Throwable::printStackTrace);                          //onError (this will be called)
    }
}

 

András Döbröntey

About the Author

András Döbröntey

Leave a Comment:

Bitnami