Playing with Reactive Java 2 and Reactive Android (RxJava/RxAndroid)

 

What is Reactive Pattern?

I don’t want to write huge theory here. Because you can browse many more articles around the web.  Simple answer for the question is,

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods. (from Wiki)

to get more idea about how ReactiveX works, check this site.

 

Even though the pattern is widely used in many programming language, here, we will see how to use them in Android platform. (I will cover iOS later)

 

Why you should use RxJava/RxAndroid in Android Platform ?

It’s totally depends on your project Architecture. If you are looking for kind of Asynchronous programming, You can pattern, you can really consider using this.  You can Completely Replace AsyncTask class using RxAndroid. I will show how to do this later,

RxPattern has 2 major components.

  1. Observable.
  2. Observer.

 

Observable emit/pass values and,

Observer is waiting to receive values from Observables.

 

Observer should be subscribe to MAIN Thread (if you need the  response should be updated to the UI.)  and the Observable should be subscribe to NEW thread.

 

before we deep dive into the examples, let me show how to setup the RxAndroid Project Environment in Android Studio.

 

Create a new project with empty Activity.

 

 

 

add the necessary dependency files in the Gradle file. for RxAndroid the following two libraries are very very important.

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'

 

Version numbers are very very important. You must use the latest version of RxJava and RxAndroid. those a mentioned in  RxAndroid Official Github codebase.

 

 

 

My build.gradle (app module) is,

 

apply plugin: 'com.android.application'

android {
    compileSdkVersion 26
    buildToolsVersion "26.0.2"
    defaultConfig {
        applicationId "com.appsgit.rxjavatesting"
        minSdkVersion 21
        targetSdkVersion 26
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "android.support.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
}

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
        exclude group: 'com.android.support', module: 'support-annotations'
    })
    compile 'com.android.support:appcompat-v7:26.+'
    compile 'com.android.support.constraint:constraint-layout:1.0.2'
    testCompile 'junit:junit:4.12'

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'

    compile 'com.squareup.retrofit2:retrofit:2.3.0'

    compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

    compile 'com.google.code.gson:gson:2.8.1'
    compile 'com.squareup.retrofit2:converter-gson:2.3.0'

    compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'

}

 

Have you noticed that i have added Retrofit to the project. I will show you some examples how to use Retrofit with RxAndroid in parallel.

 

 

the Activity xml has  button for every examples.

 

 

 

<?xml version="1.0" encoding="utf-8"?>
<FrameLayout xmlns:android="http://schemas.android.com/apk/res/android"
    android:id="@+id/content"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:background="@android:color/holo_blue_bright"
    android:orientation="vertical">

    <LinearLayout
        android:id="@+id/button_container"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:orientation="vertical">

        <LinearLayout
            android:id="@+id/layout1"
            android:layout_width="match_parent"
            android:layout_height="match_parent"
            android:layout_weight="1"
            android:orientation="horizontal">

            <Button
                android:id="@+id/button1"
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_marginTop="8dp"
                android:layout_marginStart="8dp"
                android:layout_marginEnd="4dp"
                android:layout_marginBottom="4dp"
                android:layout_weight="1"
                android:foreground="?attr/selectableItemBackgroundBorderless"
                android:text="Observable.just()"
                android:onClick="example1"/>

            <Button
                android:id="@+id/button2"
                
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_marginTop="8dp"
                android:layout_marginStart="4dp"
                android:layout_marginEnd="8dp"
                android:layout_marginBottom="4dp"
                android:layout_weight="1"
                android:onClick="example2"
                android:foreground="?attr/selectableItemBackgroundBorderless"
                android:text="Observable.fromArray()" />

        </LinearLayout>

        <LinearLayout
            android:id="@+id/layout2"
            android:layout_width="match_parent"
            android:layout_height="match_parent"
            android:layout_weight="1"
            android:orientation="horizontal">

            <Button
                android:id="@+id/button3"
                
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_weight="1"
                android:layout_marginTop="4dp"
                android:layout_marginStart="8dp"
                android:layout_marginEnd="4dp"
                android:layout_marginBottom="4dp"
                android:text="Observable.fromCallable()"
                android:onClick="example3"
                android:foreground="?attr/selectableItemBackgroundBorderless" />

            <Button
                android:id="@+id/button4"
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_marginTop="4dp"
                android:layout_marginStart="4dp"
                android:layout_marginEnd="8dp"
                android:layout_marginBottom="4dp"
                android:layout_weight="1"
                android:foreground="?attr/selectableItemBackgroundBorderless"
                android:text="Observable.range()"
                android:onClick="example4"/>

        </LinearLayout>

        <LinearLayout
            android:id="@+id/layout3"
            android:layout_width="match_parent"
            android:layout_height="match_parent"
            android:layout_weight="1"
            android:orientation="horizontal">

            <Button
                android:id="@+id/button5"
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_marginBottom="4dp"
                android:layout_marginEnd="4dp"
                android:layout_marginStart="8dp"
                android:layout_marginTop="4dp"
                android:layout_weight="1"
                android:foreground="?attr/selectableItemBackgroundBorderless"
                android:text="Observable.defer()" />

            <Button
                android:id="@+id/button6"
                android:layout_width="match_parent"
                android:layout_height="match_parent"
                android:layout_marginTop="4dp"
                android:layout_marginStart="4dp"
                android:layout_marginEnd="8dp"
                android:layout_marginBottom="4dp"
                android:layout_weight="1"
                android:foreground="?attr/selectableItemBackgroundBorderless"
                android:text="Observable.interval"
                android:onClick="example6"/>

        </LinearLayout>
    </LinearLayout>
</FrameLayout>

 

 

 

 

Example 1 : Observable.just() 

 

/*
    Example 1 is simple example of RxAndroid. Observable.just()  emits/pull out single value one by one.
    observer will receive the value one by one and onComplete() will be called at the end.
*/
public void example1(View view) {
    Observable observable = Observable.fromArray("1", "2", "3","3", "4", "5");

    observable.observeOn(Schedulers.newThread());

    observable.subscribeOn(AndroidSchedulers.mainThread());

    observable.subscribe(new Observer<String>(){
        @Override
        public void onNext(@NonNull String s) {
            //This is going to be running in Main thread.
            Log.d(TAG, "onNext: " + s + "\n"); //this shold print the values one by one
        }

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
        }

        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: called.");
        }
    });
}

 

 

I am trying to attach the value to the textfield in the activity.

 

Result is,

 

11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 1
11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 2
11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 3
11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 3
11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 4
11-01 23:35:24.471 2889-2889/? D/MainActivity: onNext: 5
11-01 23:35:24.471 2889-2889/? D/MainActivity: onComplete: called.

 

 

 

Example 2 : Observable.fromArray() 

 

/*
Example 2 also same as like the above. Observable.fromArray()  emits/pull out single value one by one.
observer will receive the value one by one and onComplete() will be called at the end.
*/
public void example2(View view) {
    Observable observable = Observable.fromArray("10", "20", "30","40", "50", "60");

    observable.observeOn(Schedulers.newThread());

    observable.subscribeOn(AndroidSchedulers.mainThread());

    observable.subscribe(new Observer<String>(){
        @Override
        public void onNext(@NonNull String s) {
            //This is going to be running in Main thread.
            Log.d(TAG, "onNext: " + s + "\n"); //this shold print the values one by one
        }

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
        }

        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: called.");
        }
    });
}

 

console result would be,

 

11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 10
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 20
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 30
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 40
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 50
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onNext: 60
11-02 15:02:49.987 6171-6171/com.appsgit.rxjavatesting D/MainActivity: onComplete: called.

 

 

 

 

Example 3 : Observable.fromCallable()

this is special kind of Observable. call the method only when an observer subscribe to this. check the console log for more info,

/*
Example 3 will show you how to create Observable.fromCallable() Callable interface.
inside the call() method you can run time consuming tasks..
*/
public void example3(View view) {
    Observable observable = Observable.fromCallable(new Callable() {
        @Override
        public String call() throws Exception {
            Log.d(TAG, "call: called in thee Observable object..");
            //This method can be your background thread. you can run any
            return "test value";
        }
    });

    observable.observeOn(Schedulers.newThread());

    observable.subscribeOn(AndroidSchedulers.mainThread());

    observable.subscribe(new Observer<String>(){
        @Override
        public void onNext(@NonNull String s) {
            //this will be your main thread.
            //should receive "test value" here..
            //This is going to be running in Main thread.
            Log.d(TAG, "onNext: " + s + "\n"); //this shold print the values one by one
        }

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
            Log.d(TAG, "onSubscribe: called..");
        }

        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: called.");
        }
    });
}

console result would be,

 

11-02 15:05:38.234 14974-14974/com.appsgit.rxjavatesting D/MainActivity: onSubscribe: called..
11-02 15:05:38.234 14974-14974/com.appsgit.rxjavatesting D/MainActivity: call: called in the Observable object..
11-02 15:05:38.234 14974-14974/com.appsgit.rxjavatesting D/MainActivity: onNext: test value
11-02 15:05:38.234 14974-14974/com.appsgit.rxjavatesting D/MainActivity: onComplete: called.

 

 

Example 4 : Observable.range(1, 10)

/*
Example 4 will show you how to create Observable.range() Observable will ommit 1 to 10 values..
inside the call() method you can run time consuming tasks..
*/
public void example4(View view) {
    Observable<Integer> observable = Observable.range(1, 10);

    observable.observeOn(Schedulers.newThread());

    observable.subscribeOn(AndroidSchedulers.mainThread());

    observable.subscribe(new Observer<Integer>(){
        @Override
        public void onNext(@NonNull Integer value) {
            Log.d(TAG, "Observable.range onNext: " + value + "\n"); //this shold print the values one by one
        }

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
            Log.d(TAG, "onSubscribe: called..");
        }

        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: called.");
        }
    });
}

 

console log is,

11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: onSubscribe: called..
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 1
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 2
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 3
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 4
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 5
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 6
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 7
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 8
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 9
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: Observable.range onNext: 10
11-02 15:09:38.031 17967-17967/com.appsgit.rxjavatesting D/MainActivity: onComplete: called.

 

Example 5 : Observable.defer()

 

/*
    Example 5 is simple example of RxAndroid. Observable.defer()  returns an observable .
    observer will receive the value one by one and onComplete() will be called at the end.
*/
public void example5(View view) {

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(1, 2, 3, 4);
        }
    });
    observable.observeOn(Schedulers.newThread());

    observable.subscribeOn(AndroidSchedulers.mainThread());

    observable.subscribe(new Observer<Integer>(){

        @Override
        public void onSubscribe(@NonNull Disposable disposable) {
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            Log.d(TAG, "Observable.defer onNext: value is " + integer );
        }

        @Override
        public void onError(@NonNull Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: called.");
        }
    });
}

console log is ,

11-02 15:12:54.350 21544-21544/com.appsgit.rxjavatesting D/MainActivity: Observable.defer onNext: value is 1
11-02 15:12:54.350 21544-21544/com.appsgit.rxjavatesting D/MainActivity: Observable.defer onNext: value is 2
11-02 15:12:54.350 21544-21544/com.appsgit.rxjavatesting D/MainActivity: Observable.defer onNext: value is 3
11-02 15:12:54.350 21544-21544/com.appsgit.rxjavatesting D/MainActivity: Observable.defer onNext: value is 4
11-02 15:12:54.350 21544-21544/com.appsgit.rxjavatesting D/MainActivity: onComplete: called.

 

 

Example 6 : Observable.interval()

This is also an interesting Observable. this emits values every hour/minutes or seconds (depends on the TimeUnit we defined.)

   /*
       Observable.interval()  is really an interesting obserable. it can emmit value in every hour/minutes/seconds.
       observer will receive the value one by one and onComplete() will be called at the end.
       the below example will emit value every single minutes..
   */
    public void example6(View view) {

        //we can use any of the commented observables.
//        Observable<Long> observable = Observable.interval(1, 1, TimeUnit.HOURS);
//        Observable<Long> observable = Observable.interval(1, 1, TimeUnit.DAYS);
//        Observable<Long> observable = Observable.interval(1, 1, TimeUnit.MILLISECONDS);

        Observable<Long> observable = Observable.interval(1, 1, TimeUnit.SECONDS);
        
        observable.observeOn(Schedulers.newThread());

        observable.subscribeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<Long>(){

            @Override
            public void onSubscribe(@NonNull Disposable disposable) {
            }

            @Override
            public void onNext(@NonNull Long value) {
                Log.d(TAG, "Observable.interval onNext: value is " + value );
            }

            @Override
            public void onError(@NonNull Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: called.");
            }
        });
    }

 

 

console logs prints every seconds, Please quit the activity if you want to stop the console printing.

 

11-02 15:17:55.086 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 1
11-02 15:17:55.355 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 0
11-02 15:17:56.086 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 2
11-02 15:17:56.361 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 1
11-02 15:17:57.085 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 3
11-02 15:17:57.355 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 2
11-02 15:17:58.089 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 4
11-02 15:17:58.355 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 3
11-02 15:17:59.086 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 5
11-02 15:17:59.355 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 4
11-02 15:18:00.085 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 6
11-02 15:18:00.355 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 5
11-02 15:18:01.085 25990-26357/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 7
11-02 15:18:01.356 25990-26380/com.appsgit.rxjavatesting D/MainActivity: Observable.interval onNext: value is 6
........

.....

..................

 

if you want to quiet after 10 seconds. you can do something like this in the sixth example,

observable.subscribe(new Observer<Long>(){
    Disposable disposable = null;

    @Override
    public void onSubscribe(@NonNull Disposable disposable) {
        this.disposable = disposable;
    }

    @Override
    public void onNext(@NonNull Long value) {

        Log.d(TAG, "Observable.interval onNext: value is " + value );

        //dispose observer after 10 seconds. We have set to TimeUnit.SECONDS.
        if (disposable != null && value >= 10) {
            disposable.dispose();
        }
    }

    @Override
    public void onError(@NonNull Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: called.");
    }
});

 

 

 

thats all for now. I ill update the post very soon and you can download the sample code from my github

 

Thanks.!

About Zumry

Zumry Mohamed

Self Taught iOS & Android Mobile Application Developer.

Article written by zumrywahid

Self Taught iOS & Android Mobile Application Developer.

Be the first to comment

Leave a Reply

Your email address will not be published. Required fields are marked *