1. Introduction

In this lab, we are going to see how Reactive eXtension for Java (RX Java) let you use reactive programming and build concurrent and responsive applications.

This lab offers attendees an intro-level, hands-on session with RX Java, from the first line of code, to make a library exposing RX Java API. It illustrates what reactive programming is, and how to build applications using this paradigm. Yes, because it’s a paradigm and it’s quite different from traditional Java development.

This is a BYOL (Bring Your Own Laptop) session, so bring your Windows, OSX, or Linux laptop. You need JDK 8+ on your machine, and Apache Maven (3.5+).

1.1. Prerequisites

We will get to the good stuff, coding all the way soon…​ But before we start, we need to install a couple of software on our machine.

1.1.1. Hardware

  • Operating System: whatever recent enough

  • Memory: At least 4 GB+

1.1.2. Java Development Kit

We need a JDK 8+ installed on our machine. Latest JDK can be downloaded from:

You can use either Oracle JDK or OpenJDK.

1.1.3. Apache Maven

  1. Download Apache Maven from https://maven.apache.org/download.cgi.

  2. Unzip to a directory of your choice and add it to the $PATH.

1.1.4. IDE

We recommend you to use an IDE. You can use Eclipse IDE, VS Code, IntelliJ or Netbeans.

No IDE ? If you don’t have an IDE, here are the step to get started with Eclipse.

  1. First download Eclipse from the download page.

  2. In the Eclipse Package list, select Eclipse IDE for Java Developers. It brings you to a download page with a Download button.

  3. Once downloaded, unzip it.

  4. In the destination directory, you should find an eclipse binary that you can execute.

  5. Eclipse asks you to create a workspace.

  6. Once launched, click on the Workbench arrow (top right corner).

1.2. Let’s start !

git clone https://github.com/cescoffier/rxjava2-lab.git

You can import the code into your IDE as a Maven project. You can refer to your IDE documentation to know how to import Maven projects.

For Eclipse:

  1. Click on File - Import …​ - Maven - Existing Maven Projects`

  2. Select the location where you cloned the sources

  3. Click Finish and wait…​

1.3. How to run the different exercises

Maven is only used to retrieve a few dependencies. From the location where you cloned the repository, run:

mvn compile

It will download the required dependencies.

The code is in src/main/java. All the exercises have a public static void main(String…​ args) method. To run the exercise, just run this main method.

I believe you are ready to start!

2. The reactive thinking: from OOP to streams

Forget everything you know about code, and look around. Modeling this world with code is not easy. As developers, we tend to follow counter-intuitive approaches. Since the 80’s, object-oriented programming has been seen as a silver bullet. Every entity from our world is represented by an object containing fields and exposing methods. Most of the time, interacting with these objects is done using a blocking and synchronous protocol. You invoke a method and wait for a response.

But…​ the world in which we are living is asynchronous. The interactions are done using events, messages, and stimuli. To overcome the limitations of the object orientation, many patterns and paradigms emerged. But, more recently, functional programming is making a come-back, not to replace the object-orientation, but to complement it. Reactive programming is a functional event-driven programming approach that is used in combination with the regular object-oriented paradigm.

A few years ago, Microsoft created a reactive programming framework for .NET called Reactive eXtensions (also called ReactiveX or RX). RX is an API for asynchronous programming with observable streams. This API has been ported to several languages such as Swift, JavaScript, Python, C++, and Java.

Let’s observe our world for a moment. Observe entities in motion, traffic jams, weather, conversations, financial markets. Things are moving and evolving concurrently. Multiple things happen at the same time, sometimes independently, sometimes in an orchestrated manner. Each object is creating a stream of events. For instance, your mouse cursor position is moving. The sequence of position is a stream. The number of people in the room may be stable, but someone can come in or go out, generating a new value. So we have another stream of values. There is a fundamental mantra behind reactive programming: events are data and data are events.

But don’t be mistaken, reactive programming is not a silver bullet. Reactive programming and RX let you express business logic in term of streams of events, helping you with concurrency and error recovery; but don’t think it’s magic…​ because it’s not. RX gives you superpowers when dealing with asynchronous scenarios, but it does not come without costs.

2.1. "Enough philosophy, I wanna see code"

Alright! Let’s see the code. Can we start with our beloved XML? In the pom.xml file located at the source of the code repository, you can see a few dependencies. One of them is:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

That’s the only dependency you need to start using RX Java.

Now open the src/main/java/me/escoffier/lab/chapter1/Code1.java:

package me.escoffier.lab.chapter1;

import java.util.Arrays;
import java.util.List;

import io.reactivex.*;

public class Code1 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String... args) {
        Observable<String> stream = Observable.fromIterable(SUPER_HEROES);
        stream.subscribe(
            name -> System.out.println(name)
        );
    }
}

This is your first RX Java application. A couple of points to understand:

  • import io.reactivex.*; imports the classes form RX Java

  • Observable<String>: represents a stream of data (here String). Notice the class name, it invites you to observe it.

  • Observable.fromIterable: creates a stream (Observable) from a collection

  • stream.subscribe: declare an observer consuming the data passing in the streams. The passed lambda is called for each item.

Run this example and you should see:

Superman
Batman
Aquaman
Asterix
Captain America

A gentle note about subscribe…​ If you don’t subscribe to a stream, nothing happens. None of the processing stages will be executed until you subscribe to it. This is very important to remember to avoid thousands of hours of debugging!

2.2. That’s all?

Ok, not really impressive…​ But the true power of RX Java comes from its set of operators to manipulate the streams. Jump to src/main/java/me/escoffier/lab/chapter1/Code2.java:

package me.escoffier.lab.chapter1;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

public class Code2 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String... args) {
        Observable
            .fromIterable(SUPER_HEROES)
            .map(n -> n.toUpperCase())
            .filter(name -> name.startsWith("A"))
            .subscribe(
                name -> System.out.println(name)
            );
    }
}

This example uses two operators:

  1. map: for each item of the observed stream, apply the function - here transform the name to uppercase

  2. filter: for each item of the observed stream (the uppercase names), select only names starting with an A.

Run this example, you should see:

AQUAMAN
ASTERIX

There is a very important point to make here: operators consume items from a stream and produce a stream. The first part is simple to understand. Typically, for filter, it received SUPERMAN then BATMAN and so on. The second part is a bit more tricky. Let’s take map as an example:

Input: Superman    Batman     Aquaman ... <- input stream
            |           |        |
Result:  SUPERMAN    BATMAN   AQUAMAN ... <- this is also a stream

It produces one value per received value, in other words, a sequence of value: it’s also a stream.

2.3. Ready to see more

In this chapter, you have seen some very basic RX Java 2, there is a lot more…​

3. Observables and Subscribers

3.1. Anatomy of a stream

What’s a stream? A stream is a sequence of data, potentially unbounded. This data may be known or unknown when the stream is created. We will see later that some special streams emit a single item and some none at all. Streams are asynchronous constructs. When you observe a stream, you don’t know when data is going to be emitted.

In a (regular) stream, 3 types of items can be conveyed, and so an observer can receive 3 types of events, each of them is notified using one of the following methods:

  • onNext - this passes each item the observed stream is emitting

  • onComplete - this communicates the end of the stream. onNext won’t be called anymore. This notification does not happen on unbounded streams

  • onError - this communicates that something bad happened up the chain to the observer. Unless there is a retry (we will cover this later), the stream won’t emit any more items. onComplete is not called either in this case.

Time to go back to code. Open the me.escoffier.lab.chapter2.Code1 class, and extend the code to display a message (using System.out.println) when:

  1. an item is emitted

  2. the stream is completed

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

public class Code1_Solution {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String... args) {
        Observable.fromIterable(SUPER_HEROES)
            .doOnNext(s -> System.out.println("Next >> " + s))
            .doOnComplete(() -> System.out.println("Completion"))
            .subscribe();
    }
}

You should get an output like:

Next >> Superman
Next >> Batman
Next >> Aquaman
Next >> Asterix
Next >> Captain America
Completion

Now, let’s see what happens when an error occurs. In me.escoffier.lab.chapter2.Code2, use the doOnComplete, doOnNext and doOnError to print the different events.

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

public class Code2_Solution {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String... args) {
        Observable.fromIterable(SUPER_HEROES)
            .map(name -> {
                if (name.endsWith("x")) {
                    throw new RuntimeException("What a terrible failure!");
                }
                return name.toUpperCase();
            })
            // Use doOnNext, doOnComplete and doOnError to print messages
            // on each item, when the stream complete, and when an error occurs
            .doOnNext(s -> System.out.println(">> " + s))
            .doOnComplete(() -> System.out.println("Completion... not called"))
            .doOnError(err -> System.out.println("Oh no! " + err.getMessage()))
            .subscribe();
    }
}

Notice the output:

>> SUPERMAN
>> BATMAN
>> AQUAMAN
Oh no! What a terrible failure!

Once the error is reached, no more items are sent. In addition, the doOnComplete method is not called.

While the doOnX methods are interesting to understand what’s going on and also implement side-effects (be aware they are not necessarily a good thing), you can also receive these events in the subscriber directly. The subscribe method can:

  • receive the items such as in: stream.subscribe(i → {});

  • receive the error such as in: stream.subscribe(i → {}, err → {});

  • receive the completion event such as in: stream.subscribe(i → {}, err → {}, () → {});

You can also implement the subscriber interface directly. In me.escoffier.lab.chapter2.Code3, use the 3 lambdas version of the subscribe method to print the different events.

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

public class Code3_Solution {

    public static void main(String... args) {
        Observable.just("Black Canary", "Catwoman", "Elektra")
            .subscribe(
                name -> System.out.println(">> " + name),
                Throwable::printStackTrace,
                () -> System.out.println("Completion")
            );
    }
}

In this example, the stream is created with the method just taking the list of items as a parameter.

3.2. Creating more dynamic streams

So far we always used a stream with a fixed collection of values. Of course, items emitted by streams may be unknown at creation time. But let’s see how streams work first.

The create method takes a method in a parameter called on every subscription (so for every subscriber). The following code shows how this method is used:

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

public class Code4 {

    public static void main(String... args) {
        Observable<String> stream = Observable.create(subscriber -> {
            // Emit items
            subscriber.onNext("Black Canary");
            subscriber.onNext("Catwoman");
            subscriber.onNext("Elektra");
            // Notify the completion
            subscriber.onComplete();
        });

        stream
            .subscribe(
                i -> System.out.println("Received: " + i),
                err -> System.out.println("BOOM"),
                () -> System.out.println("Completion")
            );

    }
}

Now open the me.escoffier.lab.chapter2.Code5 class and extend it to inject a failure between two emissions.

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

public class Code5_Solution {

    public static void main(String... args) {
        Observable<String> stream = Observable.create(subscriber -> {
            // Emit items
            subscriber.onNext("Black Canary");
            subscriber.onNext("Catwoman");
            // Inject an error
            subscriber.onError(new Exception("What a terrible failure"));
            subscriber.onNext("Elektra");
            // Notify the completion
            subscriber.onComplete();
        });

        stream
            .subscribe(
                i -> System.out.println("Received: " + i),
                err -> System.out.println("BOOM"),
                () -> System.out.println("Completion")
            );

    }
}

When you run it you can notice the same behavior as before: once an error is injected, the other values are not received by the subscriber (even if the stream continues emitting errors). This is because, once the error is received, the subscription is canceled - meaning that it does not observe the events anymore.

Let’s see another example to exhibit this behavior:

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

import java.util.Scanner;

public class Code6 {

    public static void main(String... args) {
        Observable<String> stream = Observable.create(subscriber -> {
            boolean done = false;
            Scanner scan = new Scanner(System.in);
            while(! done) {
                String input = scan.next();
                if (input.contains("done")) {
                    done = true;
                    subscriber.onComplete();
                } else if (input.contains("error")) {
                    subscriber.onError(new Exception(input));
                } else {
                    subscriber.onNext(input);
                }
            }
        });

        stream
            .subscribe(
                i -> System.out.println("Received: " + i),
                err -> System.out.println("BOOM"),
                () -> System.out.println("Completion")
            );

    }
}

This application takes user input and builds a stream out of them. So for every line, it emits the value. If the line contains "error", an error is injected. If the line contains "done" the stream is closed. If you run the application and enter: hello, foo, error, not received anymore you get the following output:

hello
Received: hello
foo
Received: foo
error
BOOM

3.3. Hot vs. Cold streams

Things are becoming a bit more subtle…​ This is a key concept to grasp before going further. Streams can be cold or hot (like coffee).

3.3.1. Cold streams

A cold stream restarts from the beginning for each subscriber, and every subscriber gets the full set of items. For instance, let’s have a look at the following code:

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

import java.util.Scanner;

public class Code7 {

    public static void main(String... args) {
        Observable<String> stream = Observable.just("Black Canary", "Catwoman", "Elektra");

        stream
            .subscribe(
                i -> System.out.println("[A] Received: " + i),
                err -> System.out.println("[A] BOOM"),
                () -> System.out.println("[A] Completion")
            );

        stream
            .subscribe(
                i -> System.out.println("[B] Received: " + i),
                err -> System.out.println("[B] BOOM"),
                () -> System.out.println("[B] Completion")
            );

    }
}

When running this application (me.escoffier.lab.chapter2.Code7) you get:

---
[A] Received: Black Canary
[A] Received: Catwoman
[A] Received: Elektra
[A] Completion
[B] Received: Black Canary
[B] Received: Catwoman
[B] Received: Elektra
[B] Completion
---

As you can see, both subscribers get the full set of items.

3.3.2. Hot streams

Unlike cold streams, hot streams broadcast the same items to all listening subscribers. However, if a subscriber arrives later, it won’t receive the previous items. Logically, hot streams represent events or facts rather than known finite data sets.

Let’s imagine a counter. This counter is incremented every second, and this value is emitted in a stream. When a subscriber starts listening to, it only gets the data emitted after that. This behavior is depicted in the me.escoffier.lab.chapter2.Code8 class:

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;

import static me.escoffier.lab.chapter2.HotStream.nap;

public class Code8 {

    public static void main(String... args) {
        Observable<Integer> stream = HotStream.create();

        stream
            .subscribe(
                i -> System.out.println("[A] Received: " + i),
                err -> System.out.println("[A] BOOM"),
                () -> System.out.println("[A] Completion")
            );

        nap();

        stream
            .subscribe(
                i -> System.out.println("[B] Received: " + i),
                err -> System.out.println("[B] BOOM"),
                () -> System.out.println("[B] Completion")
            );

    }
}

When running this application, you can see an output similar to:

[A] Received: 0
[A] Received: 1
[B] Received: 1
[A] Received: 2
[B] Received: 2
[A] Received: 3
[B] Received: 3
[A] Received: 4
[B] Received: 4
...

The first subscriber (A) received the values 0, 1, 2, 3 and 4. The second subscriber (B) arrived one second later and so it missed the value 0. It just received 1, 2, 3 and 4.

There are ways to transform cold stream into hot stream named ConnectableObservable. We won’t cover these in this lab, but we invite you to check the documentation.

3.3.3. Stopping emissions

Cleanup is always a good thing. In the previous example, we had 2 subscribers. Let’s reuse this example, but this time cancel the subscription after some time.

The subscribe method returns a io.reactivex.disposables.Disposable object that allows canceling the subscription using the dispose method. The class me.escoffier.lab.chapter2.Code9 uses this method to cancel the subscription to the hot stream:

package me.escoffier.lab.chapter2;


import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import static me.escoffier.lab.chapter2.HotStream.nap;

public class Code9 {

    public static void main(String... args) {
        Observable<Integer> stream = HotStream.create();

        Disposable s1 = stream
            .subscribe(
                i -> System.out.println("[A] Received: " + i),
                err -> System.out.println("[A] BOOM"),
                () -> System.out.println("[A] Completion")
            );

        // Wait before starting the next subscriber
        nap();

        Disposable s2 = stream
            .subscribe(
                i -> System.out.println("[B] Received: " + i),
                err -> System.out.println("[B] BOOM"),
                () -> System.out.println("[B] Completion")
            );

        nap(5);

        // Cancel the subscription for A
        s1.dispose();

        nap(3);

        // Cancel the subscription for B
        s2.dispose();
    }
}

Once canceled, the subscriber does not receive any event anymore. It does not get a completion event either.

3.4. Streams, streams, streams…​

In this chapter, we have seen how streams are structured, how they behave and how observers/subscribers work. We have only seen streams containing a set of data. In the next chapter, we will see some more special streams.

4. Single, Completable, Maybe and Flowable

In the previous chapters, we have used streams containing several items. In this chapter, we are going to see some specialized streams:

  • Single - A stream emitting 1 item

  • Maybe - A stream emitting 0 or 1 item

  • Completable - A stream containing no items

  • Flowable - A multi-item stream supporting back-pressure

4.1. Single

A Single is a specialized stream that only emits one item. It works like the Observable streams we have seen previously but is limited to operators that make sense for a single emission. Typically, doOnNext and doOnComplete are replaced by doOnSuccess that accept the produced item.

In me.escoffier.lab.chapter3.Code1, you can see how this new method is used:

package me.escoffier.lab.chapter3;


import io.reactivex.Single;

public class Code1 {


    public static void main(String[] args) {
        Single.just("Superman")
            .doOnSuccess(s -> System.out.println("Hello " + s))
            .subscribe();
    }
}

This specialization also affects the subscribe method and provides a new form accepting the result and the error in a biconsumer. In me.escoffier.lab.chapter3.Code2, complete the code to use this new variant.

package me.escoffier.lab.chapter3;


import io.reactivex.Single;

public class Code2_Solution {


    public static void main(String[] args) {
        Single.just("Superman")
            .subscribe(
                (name, err) -> {
                    if (err == null) {
                        System.out.println("Hello " + name);
                    } else {
                        err.printStackTrace();
                    }
                }
            );
    }
}

Singles are often used for asynchronous operations returning a single result, such as an HTTP request. The following example uses the Vert.x Web Client to retrieve a list of superheroes:

package me.escoffier.lab.chapter3;


import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.client;

public class Code3 {

    public static void main(String[] args) {
        SuperHeroesService.run();

        client().get("/heroes").rxSend()
            .map(HttpResponse::bodyAsJsonObject)
            .map(JsonObject::size)
            .subscribe(length -> System.out.println("Number of heroes: " + length));
    }
}

client().get("/heroes").rxSend() returns a Single<HttpResponse>. The rest of the processing is called when the response is received from the server.

Super Heroes & Villains service: the previous example is the first use of this service. It runs locally on http://localhost:8080. Don’t forget to stop the example/exercise once done or the next run won’t work (because of the port already used). If you want to see what is provided by the service check:

4.2. Maybe

Maybe is a stream that can emit 0 or 1 item. It is useful because Single can’t emit null (null is an illegal value). Maybe observers are notified:

  • when a value is emitted using the onSuccess method,

  • when the stream complete, without a value using the onComplete method,

  • when an error is thrown using the onError method

Notice the subtlety about onSuccess and onComplete. The first one is called when there is a value. The second one is called when there is not. This behavior is shown in the class me.escoffier.lab.chapter3.Code4:

package me.escoffier.lab.chapter3;


import io.reactivex.Maybe;

public class Code4 {


    public static void main(String[] args) {
        Maybe.just("Superman")
            .subscribe(
                name -> System.out.println("[A] Received " + name),
                Throwable::printStackTrace,
                () -> System.out.println("[A] Completed")
            );

        Maybe.empty()
            .subscribe(
                name -> System.out.println("[B] Received " + name + " (not called)"),
                Throwable::printStackTrace,
                () -> System.out.println("[B] Completed")
            );
    }
}

The output of this code is:

[A] Received Superman
[B] Completed

Maybe is often used for methods that may return null. For example, an asynchronous version of a findById method would return a Maybe. Let’s use Maybe to check if there is a superhero named "Yoda" and another one named "Clement". Open the me.escoffier.lab.chapter3.Code5 class and fill the code. The output should be something like:

Loaded 727 heroes and villains
Yes, Yoda is a superhero
No, clement is not a superhero
package me.escoffier.lab.chapter3;


import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.client;

public class Code5_Solution {


    public static void main(String[] args) {
        SuperHeroesService.run();
        String name1 = "Yoda";
        String name2 = "clement";

        client().get("/heroes").rxSend()
            .map(HttpResponse::bodyAsJsonObject)
            .filter(json -> contains(name1, json))
            .subscribe(
                x -> System.out.println("Yes, " + name1 + " is a super hero"),
                Throwable::printStackTrace,
                () -> System.out.println("No, " + name1 + " is not a super hero")
            );


        client().get("/heroes").rxSend()
            .map(HttpResponse::bodyAsJsonObject)
            .filter(json -> contains(name2, json))
            .subscribe(
                x -> System.out.println("Yes, " + name2 + " is a super hero"),
                Throwable::printStackTrace,
                () -> System.out.println("No, " + name2 + " is not a super hero")
            );
    }

    private static boolean contains(String name, JsonObject json) {
        return json.stream().anyMatch(e -> e.getValue().toString().equalsIgnoreCase(name));
    }
}

Don’t forget to stop the process once done.

4.3. Completable

Completable represents a stream not emitting a value but simply concerned with an action being executed. As a consequence, it does not provide a doOnNext method as there is no next. It indicates the successful completion of a (potentially asynchronous) process or its failure:

package me.escoffier.lab.chapter3;


import io.reactivex.Completable;

public class Code6 {

    public static void main(String[] args) {
        Completable.fromAction(() -> System.out.println("Hello"))
            .subscribe(
                () -> System.out.println("OK"),
                err -> System.out.println("KO")
            );
    }

}

In me.escoffier.lab.chapter3.Code7, fill the code to write a simple message into a file. For this, use the method named rxWriteFile that accepts the path (such as hello.txt) and a Buffer (such as Buffer.buffer("hello")). It returns a Completable indicating when the file has been written to (and it the write has been successful).

package me.escoffier.lab.chapter3;

import io.vertx.reactivex.core.buffer.Buffer;

import static me.escoffier.superheroes.Helpers.fs;

/**
 * @author <a href="http://escoffier.me">Clement Escoffier</a>
 */
public class Code7_Solution {

    public static void main(String[] args) {
        fs()
            .rxWriteFile(
                "hello.txt", Buffer.buffer("hello")
            )
            .subscribe(
                () -> System.out.println("File written"),
                Throwable::printStackTrace
            );
    }
}

4.4. Flowable and back pressure

So far, we have seen examples of streams that were pushing items to subscribers. However, there is an issue with this model. If your consumer cannot keep up with the pace, something bad is going to happen. Putting a buffer in between will only handle small bumps. This is where back-pressure comes into the picture. But first, let’s illustrate the example.

In me.escoffier.lab.chapter3.Code8, we create a stream of integers and display them in the subscribe method after a small nap:

package me.escoffier.lab.chapter3;

import io.reactivex.Observable;

public class Code8 {

    public static void main(String[] args) {
        // Create an observable emitting all numbers between 1 and 10000
        Observable.range(1, 10000)
            .map(Item::new)
            .subscribe(
                item -> {
                    nap();
                    System.out.println("Received : " + item.i);
                }
            );
    }


    private static class Item {
        private final int i;

        Item(int number) {
            System.out.println("Constructing item using " + number);
            this.i = number;
        }
    }

    private static void nap() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // Ignore me.
        }
    }

}

It gives the following output:

Constructing item using 1
Received: 1
Constructing item using 2
Received: 2
Constructing item using 3
Received : 3
Constructing item using 4
Received: 4

If you run this example, everything is fine. Each emission is processed one by one, and one at a time from the source all the way down to the subscriber. This is because a single thread is involved in the process, making everything synchronous. It creates a serialized processing, no problem so far.

Now let’s introduce a change of thread (Schedulers will be covered later - just trust us for now). This code is in me.escoffier.lab.chapter3.Code9:

package me.escoffier.lab.chapter3;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import me.escoffier.lab.chapter2.HotStream;

public class Code9 {

    public static void main(String[] args) {
        // Create an observable emitting all numbers between 1 and 999_999_999
        Observable.range(1, 999_999_999)
            .map(Item::new)
            // Emissions are made on the caller thread (main)
            // The next processing stages and the terminal subscriber
            // is now called on a separate thread (io thread).
            .observeOn(Schedulers.io())
            .subscribe(
                item -> {
                    nap();
                    System.out.println("Received : " + item.i);
                }
            );

        // Wait for 20 seconds. Without this the process will terminate immediately.
        HotStream.nap(20);
    }


    private static class Item {
        private final int i;

        Item(int number) {
            System.out.println("Constructing item using " + number);
            this.i = number;
        }
    }

    private static void nap() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // Ignore me.
        }
    }

}

Running this example produces an output similar to the following one:

...
Constructing item using 707290
Constructing item using 707291
Constructing item using 707292
Constructing item using 707293
Received: 95
Constructing item using 707294
Constructing item using 707295
Constructing item using 707296
Constructing item using 707297
Constructing item using 707298
Constructing item using 707299
...

We have constructed many items, while the subscriber is only processing the item 95! The emissions of the numbers are too fast for the consumer, and because the emissions are being pushed into an unbounded buffer by observeOn, this can be the source of many problems such as…​ running out of memory.

To mitigate this issue, RX Java 2 provides a stream named Flowable. Flowable is like Observable (it may contain multiple items) but implements a back-pressure protocol. This protocol tells the source stream to emit items at a pace specified by the consumer. Flowable uses a protocol named Reactive Streams. This specification has been introduced in Java 9 under the name java.util.concurrent.Flow and is becoming widely popular.

Let’s revisit the previous example, but instead of Observable.range, let’s use Flowable.range. This code is in me.escoffier.lab.chapter3.Code10:

package me.escoffier.lab.chapter3;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import me.escoffier.lab.chapter2.HotStream;

public class Code10 {

    public static void main(String[] args) {
        // Create an observable emitting all numbers between 1 and 999_999_999
        Flowable.range(1, 999_999_999)
            .map(Item::new)
            // Emissions are made on the caller thread (main)
            // The next processing stages and the terminal subscriber
            // is now called on a separate thread (io thread).
            .observeOn(Schedulers.io())
            .subscribe(
                item -> {
                    nap();
                    System.out.println("Received : " + item.i);
                }
            );

        // Wait for 20 seconds. Without this the process will terminate immediately.
        HotStream.nap(20);
    }


    private static class Item {
        private final int i;

        Item(int number) {
            System.out.println("Constructing item using " + number);
            this.i = number;
        }
    }

    private static void nap() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            // Ignore me.
        }
    }

}

Running this example produces something like:

....
Constructing item using 123
Constructing item using 124
Constructing item using 125
Constructing item using 126
Constructing item using 127
Constructing item using 128
Received: 1
Received: 2
Received : 3
Received: 4
Received: 5
....
Constructing item using 221
Constructing item using 222
Constructing item using 223
Constructing item using 224
Received: 97
Received: 98
Received: 99
Received: 100
Received: 101
Received: 102
Received: 103
....

What we can see here is that the source emits a set of items (128) and then, 96 items have been processed by the rest of the flow. During that time, no items have been emitted. So the consumer is telling to the source that it can’t handle more at that time, and the source stops emitting. When the consumer can finally handle more items, it requests more to the source. No more risk of OOM!

Notice that RX Java 2 provides some other back pressure strategies such as using buffers, or dropping data. Check the documentation for further details on these strategies.

4.5. Conclusion

In this chapter, we have seen a few different specialized streams to handle various situations. The following table summarizes when to use what:

Type Use case

Single

asynchronous operation returning 1 result

Maybe

asynchronous operation returning either 0 or 1 result

Completable

asynchronous operation not returning a result. Indicates the completion

Observable

sequence of data, no back pressure

Flowable

sequence of data, back-pressured

It’s great to have all these types, but the true power of RX Java comes from its set of operators.

5. Stream Operators

We have already seen a few operators, such as map, filter, and the different doOnX method. In this section, we will introduce some more. We can’t cover all of them, so we will just explore the most common ones.

5.1. Filter, First, Skip, Take…​

This first set of operators allows us to select the set of items to re-emit from a source.

5.1.1. Filter

The filter operator accepts a predicate checking if the received item should be forwarded. In me.escoffier.lab.chapter4.Code1#main, fill in the code to generate a stream containing only villains with Queen in their name.

package me.escoffier.lab.chapter4;

import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.villains_names;

public class Code1_Solution {

    public static void main(String[] args) {
        SuperHeroesService.run();

        villains_names()
            .filter(name -> name.contains("Queen"))
            .subscribe(System.out::println);

    }

}

Note: On a Single, the filter method allows you to transform it as a Maybe.

5.1.2. First & Last

On Observable and Flowable, there are a set of first operators that select the first items from a stream:

  • first(T def) takes the first item, or emits the given default if the source stream is empty. This method returns a Single.

  • firstElement() forwards the first item from the observed stream. This method returns a Maybe as the observed stream can be empty.

  • firstOrError() forwards the first item from the observed stream. If the observed stream is empty, it emits an error

The last / lastElement / lastOrError operators are the mirror operators forwarding only the last items of the stream.

5.1.3. Skip and Take

The skip operator creates a stream that skips the first items emitted by the observed stream and emits the remainder. skip accepts a number of items, but also provides a time-based variant skipping all items emitted during a given time window. skipLast is the mirror operator trimming the end of the stream.

The take operator creates a stream which emits only the first items emitted by the observed stream. As skip, take provides a time-based variant and a takeLast variant.

Open the me.escoffier.lab.chapter4.Code2 and edit the code to create a stream containing 10 villains that are from the 21 positions to 31 positions.

package me.escoffier.lab.chapter4;

import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.villains_names;

public class Code2_Solution {

    public static void main(String[] args) {
        SuperHeroesService.run();

        villains_names()
            .skip(20)
            .take(10)
            .subscribe(System.out::println);
    }

}

The output should look like this:

Loaded 727 heroes and villains
Anti-Monitor
Red Skull
Anti-Spawn
Red Mist
Redeemer II
Apocalypse
Rhino
Redeemer III
Arclight
Rick Flag

5.2. Default and Switch

Sometimes, when you realize that your stream is empty, you want to inject some sensible defaults. That’s what the defaultIfEmpty and switchIfEmpty operators are doing.

me.escoffier.lab.chapter4.Code3 shows an example of switchIfEmpty. switchIfEmpty returns a stream, so potentially several elements as in:

package me.escoffier.lab.chapter4;

import io.reactivex.Observable;
import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.heroes_names;

public class Code3 {

    public static void main(String[] args) {
        SuperHeroesService.run();

        heroes_names()
            .filter(s -> s.equals("Asterix"))
            .switchIfEmpty(Observable.just("Oh", "no", "...", "Asterix", "is", "not", "a", "super", "hero"))
            .subscribe(System.out::println);
    }

}

me.escoffier.lab.chapter4.Code4 gives an example of defaultIfEmpty. It basically injects a default value if the observed stream is empty.

package me.escoffier.lab.chapter4;

import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.heroes_names;

public class Code4 {

    public static void main(String[] args) {
        SuperHeroesService.run();

        heroes_names()
            .filter(s -> s.equals("Asterix"))
            .defaultIfEmpty("Oh no... Asterix is not a super hero")
            .subscribe(System.out::println);
    }

}

5.3. Scan and Reduce

Scan and reduce are two very close operators. They both take a seed (there are variants without) and an accumulator function. This function is called with the previous result and the item coming from the observed stream.

The me.escoffier.lab.chapter4.Code5 illustrates these two operators.

package me.escoffier.lab.chapter4;


import io.reactivex.Flowable;

import java.util.HashSet;

import static me.escoffier.superheroes.Helpers.heroes;

public class Code5 {

    public static void main(String[] args) {
        Flowable.range(0, 10)
            .scan(0, (last_result, item) -> last_result + item)
            .subscribe(i -> System.out.println("[Scan] Got " + i));

        Flowable.range(0, 10)
            .reduce(0, (last_result, item) -> last_result + item)
            .subscribe(i -> System.out.println("[Reduce] Got " + i));
    }

}

Run this example and check the difference. reduce is emitting a single value when the observed streams are completed. scan emits all computed values, including the seed (0 (seed), 0 + 0, 0 + 1, 1 + 2…​).

[Scan] Got 0
[Scan] Got 0
[Scan] Got 1
[Scan] Got 3
[Scan] Got 6
[Scan] Got 10
[Scan] Got 15
[Scan] Got 21
[Scan] Got 28
[Scan] Got 36
[Scan] Got 45
[Reduce] Got 45

Open the me.escoffier.lab.chapter4.Code6 and me.escoffier.lab.chapter4.Code7 and fill the code to use the scan and reduce methods. You will finally know if superheroes have more superpowers than supervillains.

package me.escoffier.lab.chapter4;


import java.util.HashSet;

import static me.escoffier.superheroes.Helpers.heroes;

public class Code6_Solution {

    public static void main(String[] args) {
        heroes()
            .scan(new HashSet<>(), (set, superstuff) -> {
                set.addAll(superstuff.getSuperpowers());
                return set;
            })
            .doOnNext(System.out::println)
            .count()
            .subscribe(
                number -> System.out.println("Heroes have " + number + " unique super powers")
            );
    }

}
package me.escoffier.lab.chapter4;


import java.util.HashSet;

import static me.escoffier.superheroes.Helpers.villains;

public class Code7_Solution {

    public static void main(String[] args) {
        villains()
            .reduce(new HashSet<>(), (set, superstuff) -> {
                set.addAll(superstuff.getSuperpowers());
                return set;
            })
            .doOnSuccess(System.out::println)
            .subscribe(
                set -> System.out.println("Villains have " + set.size() + " unique super powers")
            );
    }

}

5.4. Mapping a stream to a collection and vice-versa

It’s often required to accumulate items from a stream into a collection (only for bounded streams of course) or transforming a collection into a stream.

RX Java provides a set of methods to transform a stream into a collection such as toList (creating a list of items), toMap building a map and to toMultiMap building a Map<K, Collection<V>>. In the class me.escoffier.lab.chapter4.Code8 you can see an example of the toList operator:

package me.escoffier.lab.chapter4;


import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.villains;

public class Code8 {

    public static void main(String[] args) {
        villains()
            .map(Character::getName)
            .toList()
            .subscribe(list -> System.out.println("Collected " + list.size() + " names"));
    }

}

To transform a collection into a stream, use Observable.fromIterable or Flowable.fromIterable. fromArray is also quite convenient.

5.5. FlatMap

FlatMap is the operator to understand. FlatMap is very powerful but requires a bit more explanation. FlatMap takes each item emitted by the observable stream and maps it to another stream (this is the map part). Then, it merges the emissions from the returned streams into a single stream (it’s the flat part).

The simplest usage of flatMap is to produce a stream of items out of a single item. For example, in me.escoffier.lab.chapter4.Code9 you can see how the flatMap operator is used to produce a stream of words:

package me.escoffier.lab.chapter4;


import io.reactivex.Flowable;
import io.reactivex.Single;

public class Code9 {

    public static void main(String[] args) {
        String text = "Super heroes and super villains";
        Single.just(text)
            .flatMapPublisher(s -> Flowable.fromArray(s.split(" ")))
            .subscribe(System.out::println);
    }

}

You may wonder why the method is named flatMapPublisher, it’s because by default flatMap returns a stream of the same type. So Single.flatMap returns a Single. Fortunately, the method flatMapCompletable, flatMapMaybe and flatMapPublisher (for Flowable) let us transform the type of streams.

FlatMap is also used as a composing operator to express a sequential composition. For example, chaining two HTTP requests as illustrated in me.escoffier.lab.chapter4.Code10:

package me.escoffier.lab.chapter4;


import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.client;

public class Code10 {

    public static void main(String[] args) {
        SuperHeroesService.run();


        Single<JsonObject> request1 = client()
            .get("/heroes")
            .rxSend()
            .map(HttpResponse::bodyAsJsonObject);


        request1
            // Transform the response to retrieve a stream of ids.
            .flatMapObservable(j -> Observable.fromIterable(j.fieldNames()))
            // Take the first one
            .take(1)
            // this is an observable of 1 element
            // Second request
            .flatMapSingle(Code10::getHero)

            // Print the result
            .subscribe(json -> System.out.println(json.encodePrettily()));

    }

    private static Single<JsonObject> getHero(String s) {
        return client().get("/heroes/" + s).rxSend().map(HttpResponse::bodyAsJsonObject);
    }

}

You can also use flatMap on a Single to execute a second request once the Single has completed as illustrated in me .escoffier.lab.chapter4.Code11:

package me.escoffier.lab.chapter4;


import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import me.escoffier.superheroes.SuperHeroesService;

import static me.escoffier.superheroes.Helpers.client;

public class Code11 {

    public static void main(String[] args) {
        SuperHeroesService.run();


        Single<JsonObject> request1 = client()
            .get("/heroes")
            .rxSend()
            .map(HttpResponse::bodyAsJsonObject);



        request1
            // Transform the response to retrieve a stream of ids.
            .flatMapObservable(j -> Observable.fromIterable(j.fieldNames()))
            // Take the first one, as a Single
            .firstOrError()

            // Second request
            .flatMap(Code11::getHero)

            // Print the result
            .subscribe(json -> System.out.println(json.encodePrettily()));

    }

    private static Single<JsonObject> getHero(String s) {
        return client().get("/heroes/" + s).rxSend().map(HttpResponse::bodyAsJsonObject);
    }

}

5.6. Merge and Zip

RX Java also provides operators to merge streams or to associate items from different streams.

The mergeWith operator merges the current stream with other ones. The produced stream contains all the items from the merged streams in the order of their emissions. If you prefer not mixing the items use concatWith but be aware it does not work for unbounded streams.

Open the class me.escoffier.lab.chapter4.Code12 and fill the code to count the number of superpowers.

package me.escoffier.lab.chapter4;


import io.reactivex.Flowable;
import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.heroes;
import static me.escoffier.superheroes.Helpers.villains;

public class Code12_Solution {

    public static void main(String[] args) {
        Flowable<String> villains_superpowers =
            villains().map(Character::getSuperpowers)
                .flatMap(Flowable::fromIterable);
        Flowable<String> heroes_superpowers =
            heroes().map(Character::getSuperpowers)
                .flatMap(Flowable::fromIterable);

        // Merge both stream using the `mergeWith` operator

        villains_superpowers.mergeWith(heroes_superpowers)
            // Filter out duplicates using the `distinct` operator
            .distinct()
            // Count the number of item using the count operator
            .count()
            // Subscribe to print the number of unique super powers
            .subscribe(number -> System.out.println("Number of super powers: " + number));


    }

}

The zipWith operator associates items from different streams. It takes the next items emitted by each stream and calls a function with all of them. For example, we can use this to generate superheroes vs. supervillains battles. Open the me.escoffier.lab.chapter4.Code13 class and fill the code to generate fights between heroes and villains.

package me.escoffier.lab.chapter4;


import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import me.escoffier.superheroes.SuperHeroesService;
import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.client;

public class Code13_Solution {

    public static void main(String[] args) {
        SuperHeroesService.run();

        Single<Character> random_heroes = client()
            .get("/heroes/random")
            .as(BodyCodec.json(Character.class))
            .rxSend()
            .map(HttpResponse::body);

        Single<Character> random_villains = client()
            .get("/villains/random")
            .as(BodyCodec.json(Character.class))
            .rxSend()
            .map(HttpResponse::body);

        random_heroes.zipWith(random_villains, (h, v) -> fight(h, v))
            .subscribe(j -> System.out.println(j.encodePrettily()));

    }

    private static JsonObject fight(Character h, Character v) {
        String winner = h.getName();
        if (v.getSuperpowers().size() > h.getSuperpowers().size()) {
            winner = v.getName();
        } else if (v.getSuperpowers().size() == h.getSuperpowers().size()) {
            winner = "none";
        }
        return new JsonObject()
            .put("hero", h.getName())
            .put("villain", v.getName())
            .put("winner", winner);
    }
}

5.7. Error recovery

Even when using RX, errors and failures happen. I told you it’s not a silver bullet. But, I’ve got a good news for you: there are operators to handle error recovery.

Let’s start with some simple case. Imagine you want to emit a request to our Super Heroes and Villains service, but the service is not started. Something like the me.escoffier.lab.chapter4.Code14 class:

package me.escoffier.lab.chapter4;


import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.client;

public class Code14 {

    public static void main(String[] args) {
        client()
            .get("/heroes/random")
            .as(BodyCodec.json(Character.class))
            .rxSend()
            .map(HttpResponse::body)
            .map(Character::getName)
            .subscribe(
                name -> System.out.println("Retrieved: " + name),
                err -> System.err.println("Oh no... something bad happened: " + err)
            );

    }

}

When you run this program, you get:

Oh no... something bad happened: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8080

Let’s now use the onErrorReturnItem operator that let you recover from an error by injecting a default result. There is also a variant named onErrorReturn that let you decide of the value to return based on the error (Throwable). Edit the class to use the onErrorReturnItem or onErrorReturn to avoid recover from the error.

package me.escoffier.lab.chapter4;


import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.client;

public class Code14_Solution {

    public static void main(String[] args) {
        client()
            .get("/heroes/random")
            .as(BodyCodec.json(Character.class))
            .rxSend()
            .map(HttpResponse::body)
            .map(Character::getName)
            .onErrorReturnItem("Clement, even if I'm not a superhero")
            .subscribe(
                name -> System.out.println("Retrieved: " + name),
                err -> System.err.println("Oh no... something bad happened: " + err)
            );

    }

}

The onErrorResumeNext operator is similar to onErrorReturnItem but returns a stream. Basically, if an error happens, it replaces the stream with the given one. Open the me.escoffier.lab.chapter4.Code15 class and use the onErrorResumeNext to inject another Single. You can use Single.just to create a stream with an already known value.

package me.escoffier.lab.chapter4;


import io.reactivex.Single;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import me.escoffier.superheroes.Character;

import static me.escoffier.superheroes.Helpers.client;

public class Code15_Solution {

    public static void main(String[] args) {
        client()
            .get("/heroes/random")
            .as(BodyCodec.json(Character.class))
            .rxSend()
            .map(HttpResponse::body)
            .map(Character::getName)
            .onErrorResumeNext(Single.just("Clement, even if I'm not a superhero"))
            .subscribe(
                name -> System.out.println("Retrieved: " + name),
                err -> System.err.println("Oh no... something bad happened: " + err)
            );

    }

}

The onErrorResumeNext is quite useful when you want to execute another async operation when an initial one failed. It also can substitute a stream in error with a default stream similar to a switchOnError operator (that does not exist).

The last error recovery mechanism we are going to see is very powerful but can be dangerous too. The retry operator re-subscribes to a stream that emitted an error. Its behavior depends on the cold/hot nature of the observed stream. On a cold stream, the resubscription restarts the stream from the beginning. It’s very useful to retry a failed operation such as an HTTP call. The retry operator has several variants letting you decide whether or not you want to retry based on the error and the number of attempts.

5.8. Conclusion

This chapter introduced a few operators. RX Java 2 is proposing a lot more operators. Check the operator list to see the full list.

Now you have enough knowledge to start building an RX Java API.

6. How to create an RX API

Now that you are experts in RX, let’s see how you can create an RX API for your users. First, what do we call RX API? An RX API is an API using, and generally returning RX Java types such as Single, Completable or Flowable. This API is made to be asynchronous. Building such an API has several benefits:

  • it gives to the user the ability to use the RX operator and call your API that way - it’s a great way to handle asynchrony.

  • it saves you from re-implementing structures to handle streams or async types - because don’t be mistaken: asynchrony is hard.

When building such an API there are several cases and questions you need to answer:

  1. which types for which method - which RX type do you provide or consume

  2. do you have an existing RX API or do you need to consume a potentially blocking API

To answer to the first point it depends "how many result you may return". The following table gives you some rules:

Type of operation Synchronous signature RX Signature Example of use case

Operation not returning a result

void x()

Completable x()

flush()

Operation returning one result

T x()

Single<T> x()

get()

Operation returning 0 (null) or 1 result

T x() or Optional<T> x()

Maybe<T> x() or Single<Optional<T>> x()

findById()

Operation returning multiple results fetched in one step

Collection<T> x()

Single<List<T>> x()

getEntries()

Operation returning multiple results produced externally

x(Listener<T> listener)

Observable<T> x()

clicks(), keystrokes()

Operation returning multiple results fetched in multiple steps supporting back-pressure (batching, paging, windowing)

Cursor<T> x()

Flowable<T> x()

findAll()

In this chapter, we are going to create several versions of an API using the Superheroes and villains service. Unlike the one you used before, this one will retrieve the content directly from the file system. The API is defined in me.escoffier.lab.chapter5.SuperAPI:

package me.escoffier.lab.chapter5;

import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import me.escoffier.superheroes.Character;

/**
 * An API to retrieve Super heroes and villains
 */
public interface SuperAPI {

    /**
     * @return a random hero
     */
    Single<Character> hero();

    /**
     * @return a random villain
     */
    Single<Character> villain();

    /**
     * @return all heroes
     */
    Flowable<Character> heroes();

    /**
     * @return all villains
     */
    Flowable<Character> villains();

    /**
     * Looks for a character with the given name.
     *
     * @param name the name of the character. Must not be {@code null}
     * @return a {@code Maybe} completed with the found character, empty otherwise
     */
    Maybe<Character> findByName(String name);

    /**
     * Looks for a character with the given name.
     *
     * @param name the name of the character. Must not be {@code null}
     * @return a {@code Single} completed with the found character, or failed if not found
     */
    Single<Character> findByNameOrError(String name);
}

6.1. Building an RX API from another RX API

If you’re lucky and your API is already using RX types in its implementation, then writing and RX API is usually easy because it’s a matter of using all the techniques we showed in the previous chapters to match the flows to what you want to expose.

In this section, we are going to implement SuperAPI based on the FileSystem rx API provided by Eclipse Vert.x.

IMPORTANT: don’t forget to stop the example before running the next one.

6.1.1. Reading a file with an RX implementation

Let’s have a look at the Vert.x FileSystem API and it’s rx-ified version. In me.escoffier.lab.chapter5.Code1, we read the full set of superheroes and villains:

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.file.FileSystem;

public class Code1 {

	public static void main(String[] args) {
		getFile().subscribe(System.out::println, Throwable::printStackTrace);
	}

	static Single<String> getFile() {
		Vertx vertx = Vertx.vertx();
		FileSystem fileSystem = vertx.fileSystem();
		return fileSystem.rxReadFile("src/main/resources/characters.json")
				.map(buffer -> buffer.toString());
	}
}

As you can see it’s just a matter of building the right pipeline to get the file contents and transform it into the result we are looking for. For instance, me.escoffier.lab.chapter5.Code2 creates a JsonArray from the contents of the file:

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import io.vertx.core.json.JsonArray;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.file.FileSystem;

public class Code2 {

    public static void main(String[] args) {
        load()
            .map(array -> array.encodePrettily())
            .subscribe(System.out::println, Throwable::printStackTrace);
    }

    static Single<JsonArray> load() {
        Vertx vertx = Vertx.vertx();
        FileSystem fileSystem = vertx.fileSystem();
        return fileSystem.rxReadFile("src/main/resources/characters.json")
            .map(buffer -> buffer.toString())
            .map(content -> new JsonArray(content));
    }
}

Run this example, and the output is a bit more understandable.

6.1.2. Squares and circles: adapting the pipelines

Quite often you will find that the RX API you’re using internally doesn’t quite match the RX API you want to expose. For example, if we want to build an API that exposes a Flowable of super-heroes using the Vert.x FileSystem we are going to read the file, which gives us a Single<Buffer> and we want to end up with a Flowable<SuperStuff> so we are going to have to adapt our pipeline to spread the single result into a stream, and this is usually done with the variant of flatMap which is flatMapPublisher:

In me.escoffier.lab.chapter5.Code3, we map the objects read from the file to Character:

package me.escoffier.lab.chapter5;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.file.FileSystem;
import me.escoffier.superheroes.Character;

public class Code3 {

    public static void main(String[] args) {
        load()
            .subscribe(System.out::println, Throwable::printStackTrace);
    }

    static Flowable<Character> load() {
        Vertx vertx = Vertx.vertx();
        FileSystem fileSystem = vertx.fileSystem();
        return fileSystem.rxReadFile("src/main/resources/characters.json")
            .map(buffer -> buffer.toString())
            .map(content -> new JsonArray(content))
            .flatMapPublisher(array -> Flowable.fromIterable(array))
            .cast(JsonObject.class)
            .map(json -> json.mapTo(Character.class));
    }
}

Notice the usage of the cast operator that just checks that the item can be cast to the given class and casts it.

6.1.3. Implementing the heroes and villains methods

So as you have seen above, when you have an RX API, providing your API on top of is just about providing the right pipeline. Let’s start with the easy ones: villains and heroes. Open the me.escoffier.lab.chapter5.Code4 class and fill the code to returns the stream of heroes and villains. Use the Character.isVillain method as predicate to know if the character is on the good side or bad side.

package me.escoffier.lab.chapter5;

import io.reactivex.Flowable;
import me.escoffier.superheroes.Character;

public class Code4_Solution extends AbstractSuperAPI {

    public static void main(String[] args) {
        new Code4_Solution().heroes()
            .count()
            .subscribe(i -> System.out.println(i + "heroes loaded"), Throwable::printStackTrace);

        new Code4_Solution().villains()
            .count()
            .subscribe(i -> System.out.println(i + "villains loaded"), Throwable::printStackTrace);
    }

    @Override
    public Flowable<Character> heroes() {
        return load()
            .filter(character -> !character.isVillain());
    }

    @Override
    public Flowable<Character> villains() {
        return load()
            .filter(character -> character.isVillain());
    }
}

6.1.4. Implementing the hero and villain methods

Let’s increase a bit the difficulty with the hero and villain API that returns a random hero or villain. Open the me.escoffier.lab.chapter5.Code5 and edit the code to implement the 2 methods. The solution proposes 2 different approaches.

package me.escoffier.lab.chapter5;

import io.reactivex.Observable;
import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Collections;

public class Code5_Solution extends AbstractSuperAPI {

    public static void main(String[] args) {
        new Code5_Solution().hero()
            .subscribe(System.out::println, Throwable::printStackTrace);

        new Code5_Solution().villain()
            .subscribe(System.out::println, Throwable::printStackTrace);
    }

    @Override
    public Single<Character> hero() {
        return load()
            .filter(character -> !character.isVillain())
            .toList()
            .map(list -> {
                Collections.shuffle(list);
                return list;
            })
            .flatMapObservable(list -> Observable.fromIterable(list))
            .firstOrError();
    }

    @Override
    public Single<Character> villain() {
        return load()
            .filter(character -> character.isVillain())
            .toList()
            .map(list -> {
                if (list.isEmpty()) {
                    throw new RuntimeException("No villains");
                }
                Collections.shuffle(list);
                return list.get(0);
            });
    }

}

6.1.5. Implementing the findByName methods

There are 2 findByName methods. Both take a non-null name as a parameter. The findByName returns a Maybe, empty if no character matches. The findByNameOrError returns a Single. If the lookup failed an error is emitted in the stream.

Open the me.escoffier.lab.chapter5.Code6 class and implement the 2 methods.

package me.escoffier.lab.chapter5;

import io.reactivex.Maybe;
import io.reactivex.Single;
import me.escoffier.superheroes.Character;

public class Code6_Solution extends AbstractSuperAPI {

    public static void main(String[] args) {
        new Code6_Solution().findByName("SuperGirl")
            .subscribe(
                c -> System.out.println(c.getName() + " is a super " + (c.isVillain() ? "villain" : "hero")),
                Throwable::printStackTrace,
                () -> System.out.println("Nope"));

        new Code6_Solution().findByName("Clement")
            .subscribe(
                c -> System.out.println(c.getName() + " is a super " + (c.isVillain() ? "villain" : "hero")),
                Throwable::printStackTrace,
                () -> System.out.println("No, Clement is not a " + "super hero (and not a super villain either despite the rumor)"));

        new Code6_Solution().findByNameOrError("Yoda")
            .subscribe(
                c -> System.out.println(c.getName() + " is a super " + (c.isVillain() ? "villain" : "hero")),
                Throwable::printStackTrace);

        new Code6_Solution().findByNameOrError("Clement")
            .subscribe(
                c -> System.out.println(c.getName() + " is a super " + (c.isVillain() ? "villain" : "hero")),
                t -> System.out.println("The lookup as failed, as expected, Clement is neither a super hero or super villain"));
    }

    @Override
    public Maybe<Character> findByName(String name) {
        return load()
            .filter(c -> c.getName().equalsIgnoreCase(name))
            .firstElement();
    }

    @Override
    public Single<Character> findByNameOrError(String name) {
        return load()
            .filter(c -> c.getName().equalsIgnoreCase(name))
            .firstOrError();
    }
}

6.1.6. Implementation done!

We have implemented the first variant of our API. Because we have an RX API as input, the implementation is straightforward. Let’s now look at more complicated cases.

6.2. Building an RX API from another async but non-RX API

In many cases, you will be interacting with non-RX implementations that have their own async variant. Take AsynchronousFileChannel from the JDK, for example, they allow you to read a file asynchronously, and require a listener from you, which will be notified of success or failure.

Let’s now rebuild the API based on this method. Fortunately, we just have the load method to override.

In this case, you have to bridge an async API with RX, so you do that by using Single.create which will give you an Emitter object that you use to push notifications into the Single you’re creating (me.escoffier.lab.chapter5.Code7):

package me.escoffier.lab.chapter5;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import me.escoffier.superheroes.Character;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;

public class Code7 extends AbstractSuperAPI {

    public static void main(String[] args) throws InterruptedException {
        new Code7()
            .load()
            .subscribe(System.out::println, Throwable::printStackTrace);
        Thread.sleep(2000);
    }

    @Override
    protected Flowable<Character> load() {
        File file = new File("src/main/resources/characters.json");
        return Single.<ByteBuffer>create(emitter -> {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath());
            ByteBuffer buffer = ByteBuffer.allocate((int) file.length());

            channel.read(buffer, 0, null, new CompletionHandler<Integer, Void>() {

                @Override
                public void completed(Integer result, Void attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        emitter.onError(e);
                        return;
                    }
                    emitter.onSuccess(buffer);
                }

                @Override
                public void failed(Throwable error, Void attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        // ignore
                    }
                    emitter.onError(error);
                }
            });
        })
            .map(buffer -> new String(buffer.array(), StandardCharsets.UTF_8))
            .map(JsonArray::new)
            .flatMapPublisher(Flowable::fromIterable)
            .cast(JsonObject.class)
            .map(json -> json.mapTo(Character.class));
    }


}

As you can see it’s a little bit more complex because you have to deal with completion and errors yourself.

6.3. Building an RX API from a non-async source

We are now in the worst case. We need to implement our API but we can use neither a base RX-API nor an asynchronous API. Like for creating Single, it is possible to create the other types Flowable, Observable, Maybe and Completable from scratch with an Emitter.

In the next example, we’re going to use Observable.create to push all the files (recursively) found in the specified directory.

Open the me.escoffier.lab.chapter5.Code8 class and fill in missing code:

package me.escoffier.lab.chapter5;

import io.reactivex.Observable;

import java.io.File;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;

public class Code8_Solution {

    private static final Path DIRECTORY = new File("src/main/resources/super").toPath();

    public static void main(String[] args) {
        getFileNames().subscribe(System.out::println, Throwable::printStackTrace);
    }

    private static Observable<String> getFileNames() {
        return Observable.create(emitter -> {
            Files.walkFileTree(DIRECTORY,
                new SimpleFileVisitor<Path>() {
                    @Override
                    public FileVisitResult visitFile(Path path, BasicFileAttributes attr) {
                        emitter.onNext(path.toFile().getName());
                        return FileVisitResult.CONTINUE;
                    }
                });
            emitter.onComplete();
        });
    }
}

Don’t forget that in this example, you are building a synchronous API. So you may ask yourself if it’s what you want to achieve or not. We will see later what you can do about it.

6.4. Intricacies of RX APIs

Let us now see some common pitfalls of writing an RX API and how to avoid them.

6.4.1. APIs that need clean-up

Some objects require a manual clean-up after they’re not used anymore. Closing sockets, freeing buffers, returning pooled objects, there are many examples of resources that need to be cleaned up.

When you need to do clean-up after an Observable is finished, you can use the doFinally() method which is called in any completion case (success or exception).

Open the me.escoffier.lab.chapter5.Code9 class and add the doFinally call that cleans-up the directory stream:

package me.escoffier.lab.chapter5;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;

import io.reactivex.Observable;

public class Code9_Solution {

	private static final Path DIRECTORY = new File("src/main/resources/super/heroes").toPath();

    public static void main(String[] args) {
        getHeroesNames().subscribe(System.out::println, Throwable::printStackTrace);
    }

    private static Observable<String> getHeroesNames() {
        DirectoryStream<Path> stream;
        try {
            stream = Files.newDirectoryStream(DIRECTORY);
        } catch (IOException e) {
            return Observable.error(e);
        }
        return Observable.fromIterable(stream)
            .map(path -> path.toFile().getName())
            .doFinally(() -> stream.close());
    }

}

6.4.2. Operation per subscriber

In our previous example, we inadvertently tripped on a common RX user mistake of doing the heavy work before returning the Observable, which means that the caller of getHeroesNames will pay the cost even if nobody ever subscribes to the returned Observable. This can be acceptable in some cases, but in most cases, you will want the operations to only be done when someone subscribes to the Observable.

There is another reason to want to do operations for a subscriber, which is that it makes the Observable re-usable for more than one subscriber.

Witness what happens if you subscribe more than once to iterate the files by editing the me.escoffier.lab.chapter5.Code10 class to iterate twice:

package me.escoffier.lab.chapter5;

import io.reactivex.Observable;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;

public class Code10 {

    private static final Path DIRECTORY = new File("src/main/resources/super/heroes").toPath();

    public static void main(String[] args) {
        Observable<String> files = getHeroesNames();
        files.subscribe(value -> System.out.println("Subscriber 1: " + value),
            Throwable::printStackTrace);
        files.subscribe(value -> System.out.println("Subscriber 2: " + value),
            Throwable::printStackTrace);
    }

    private static Observable<String> getHeroesNames() {
        DirectoryStream<Path> stream;
        try {
            stream = Files.newDirectoryStream(DIRECTORY);
        } catch (IOException e) {
            return Observable.error(e);
        }
        return Observable.fromIterable(stream)
            .map(path -> path.toFile().getName())
            .doFinally(stream::close);
    }
}

If you run it, you will run into this output:

Subscriber 1: Yoda
Subscriber 1: SuperGirl
Subscriber 1: Spock
Subscriber 1: Tigra
java.lang.IllegalStateException: Directory stream is closed
    at java.base/sun.nio.fs.UnixDirectoryStream.iterator(UnixDirectoryStream.java:114)
    at java.base/sun.nio.fs.UnixDirectoryStream.iterator(UnixDirectoryStream.java:126)
    at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:35)
    at io.reactivex.Observable.subscribe(Observable.java:12005)
    at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:33)
    at io.reactivex.Observable.subscribe(Observable.java:12005)
    at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
    at io.reactivex.Observable.subscribe(Observable.java:12005)
    at io.reactivex.Observable.subscribe(Observable.java:11991)
    at io.reactivex.Observable.subscribe(Observable.java:11920)
    at me.escoffier.lab.chapter5.Code10.main(Code10.java:19)

This is because we ran the clean-up operation after the first iteration, but we cleaned up an instance of the directory stream that is shared across every subscriber.

Edit the me.escoffier.lab.chapter5.Code11 class to use an emitter to allocate and clean up the directory stream for every subscriber:

package me.escoffier.lab.chapter5;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;

import io.reactivex.Observable;

public class Code11_Solution {

	private static final Path DIRECTORY = new File("src/main/resources/super/heroes").toPath();

	public static void main(String[] args) {
		Observable<String> files = getHeroesNames();
		files.subscribe(value -> System.out.println("Subscriber 1: " + value),
			Throwable::printStackTrace);
		files.subscribe(value -> System.out.println("Subscriber 2: " + value),
			Throwable::printStackTrace);
	}

	private static Observable<String> getHeroesNames() {
		return Observable.<Path>create(emitter -> {
			DirectoryStream<Path> stream;
			try {
				stream = Files.newDirectoryStream(DIRECTORY);
			} catch (IOException e) {
				emitter.onError(e);
				return;
			}
			for(Path path : stream) {
        emitter.onNext(path);
      }
			stream.close();
			emitter.onComplete();
		}).map(path -> path.toFile().getName());
	}

}

When run, you should see:

Subscriber 1: Yoda
Subscriber 1: SuperGirl
Subscriber 1: Spock
Subscriber 1: Tigra
Subscriber 2: Yoda
Subscriber 2: SuperGirl
Subscriber 2: Spock
Subscriber 2: Tigra

6.4.3. Forcing a resource to be consumed

As we’ve seen previously, in most cases we want to return an RX type that will execute its operation for every subscriber. In some cases, though, you will have to write an API that forces you to allocate resources that have to be consumed (to avoid leaks) before you can return an RX type. Or sometimes you want to create an Observable that can only be subscribed to once.

In cases like these, if you return the RX type, the user can choose to never subscribe, or subscribe more than once. In order to force the user to consume the resource, you can turn things around and require the user to provide you with an Observer instead (me.escoffier.lab.chapter5.Code12):

package me.escoffier.lab.chapter5;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;

public class Code12 {
    private static final Path DIRECTORY = new File("src/main/resources/super/heroes").toPath();


    public static void main(String[] args) {
        getNamesWithForcedConsumption(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String t) {
                System.out.println("File: " + t);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
            }

        });
    }

    private static void getNamesWithForcedConsumption(Observer<String> subscriber) {
        Observable.<Path>create(emitter -> {
            DirectoryStream<Path> stream;
            try {
                stream = Files.newDirectoryStream(DIRECTORY);
            } catch (IOException e) {
                emitter.onError(e);
                return;
            }
            for (Path path : stream)
                emitter.onNext(path);
            stream.close();
            emitter.onComplete();
        })
            .map(path -> path.toFile().getName())
            .subscribe(subscriber);
    }
}

6.5. Porting blocking code to RX

Most of the time you will have legacy code that is blocking and which you need to port to reactive programming. Often, the hard part will be to simply avoid blocking, so let’s see a simple strategy for that.

6.5.1. Example blocking call

Let’s start with an example of a legacy blocking API that takes one second to complete (me.escoffier.lab.chapter5.Code13):

package me.escoffier.lab.chapter5;

import me.escoffier.superheroes.Character;

import java.util.Arrays;

public class Code13 {

    public static void main(String[] args) {
        System.out.println("Before operation");
        Character character = getBlockingSuperVillain();
        System.out.println("After operation: " + character);
    }

    private static Character getBlockingSuperVillain() {
        System.out.println("Operation starting");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // Ignore me.
        }
        System.out.println("Operation done");
        return new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false);
    }

}

Note: Here we’re faking an expensive blocking operation by sleeping, but most of the time those blocking calls will wait for IO or for expensive computations.

6.5.2. When you have a non-blocking alternative

Sometimes it’s easy to replace blocking code by non-blocking code. Either you can use another library that is non-blocking, if the blocking code is not yours, or you can find support in RX, like in our example, because RX has support for delaying code (me.escoffier.lab.chapter5.Code14):

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class Code14 {

    public static void main(String[] args) {
        System.out.println("Before operation");
        getBlockingSuperVillain()
            .subscribe(value -> System.out.println("Operation completed: " + value));
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.just(new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false))
            .delay(1, TimeUnit.SECONDS)
            .doAfterTerminate(() -> System.out.println("Operation done"));
    }

}

Note: OK you’ve caught me: we’ve actually ported a call to sleep that faked an expensive code into code that waits for one second with delay but couldn’t possibly do anything else by wait, so it’s really an edgy example.

If you run this code, you will notice that we’re not actually getting notified, so what happened?

6.5.3. Side-note about who executes what

In the next chapter we will cover Schedulers in detail, but to understand them properly, we have to explain what happened in the last code sample.

What happened was that we created a Single with a delay which is supposed to be emitted later in the future, and we subscribed to it, but remember that all that is non-blocking. So RX planned for our Single to emit in the future, and returned to the caller thread which called the main thread. And then we returned from the main thread, so the JVM shut down.

In most cases, your code will not run from the main thread and the JVM will not terminate when your function is called, like in most back-end servers. So in order to emulate the JVM not terminating, there are several ways to make it wait for the RX subscriber to be done, like sleeping longer. Open the me.escoffier.lab.chapter5.Code15 class. Thanks to a malicious Thread.sleep we can ask the main thread to wait a little bit longer.

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class Code15 {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Before operation");
        getBlockingSuperVillain()
            .subscribe(value -> System.out.println("Operation completed: " + value));

        Thread.sleep(2000);
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.just(new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false))
            .delay(1, TimeUnit.SECONDS)
            .doAfterTerminate(() -> System.out.println("Operation done"));
    }

}

Now it works and we’ve created our first async API.

For the sake of completeness, let us list several preferable options for blocking the main thread, such as using a CountDownLatch:

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Code16 {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("Before operation");
        CountDownLatch latch = new CountDownLatch(1);
        getBlockingSuperVillain()
            .subscribe(value -> {
                System.out.println("Operation completed: " + value);
                latch.countDown();
            });
        System.out.println("After operation");
        latch.await();
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.just(new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false))
            .delay(1, TimeUnit.SECONDS)
            .doAfterTerminate(() -> System.out.println("Operation done"));
    }

}

Or even — the irony — blocking by transforming our Single into a JDK Future and blocking on it (because get is blocking):

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Code17 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("Before operation");
        Character value = getBlockingSuperVillain().toFuture().get();
        System.out.println("Operation completed: " + value);
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.just(new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false))
            .delay(1, TimeUnit.SECONDS)
            .doAfterTerminate(() -> System.out.println("Operation done"));
    }

}

Or simply by calling the Single.blockingGet method:

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Code18 {

    public static void main(String[] args) {
        System.out.println("Before operation");
        Character value = getBlockingSuperVillain().blockingGet();
        System.out.println("Operation completed: " + value);
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.just(new Character("Frog-Man",
            Arrays.asList("super strength", "leaping", "mega agility", "French"),
            false))
            .delay(1, TimeUnit.SECONDS)
            .doAfterTerminate(() -> System.out.println("Operation done"));
    }

}

6.5.4. When you have no choice but to use a thread

Sometimes you just have to use blocking APIs, even though you want to expose an RX API which can’t block the current thread. In this case, you can offload your blocking calls to a new thread, or a thread pool, or a special scheduler, whatever you want.

This can be done by using Single.create which provides you with a SingleEmitter object you can call to notify the Single when you have your item (by calling onSuccess), or an exception (by calling onError).

Open the me.escoffier.lab.chapter5.Code19 class and fill in the blocking code we had previously example that we could not port:

package me.escoffier.lab.chapter5;

import io.reactivex.Single;
import me.escoffier.superheroes.Character;

import java.util.Arrays;

public class Code19_Solution {

    public static void main(String[] args) {
        System.out.println("Before operation");
        Character result = getBlockingSuperVillain().blockingGet();
        System.out.println("After operation: " + result);
    }

    private static Single<Character> getBlockingSuperVillain() {
        return Single.create(emitter ->
            new Thread(() -> {
                System.out.println("Operation starting");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    emitter.onError(e);
                    return;
                }
                emitter.onSuccess(new Character("Frog-Man",
                    Arrays.asList("super strength", "leaping", "mega agility", "French"),
                    false));
            }).start()
        );
    }

}

Using threads like this is not recommended. But you need to be aware of the possibility. The last set of examples have illustrated issues when dealing with blocking operations. Fortunately, RX proposes various solutions to handle these cases. This will be covered in the next chapter.

6.6. Conclusion

In this chapter, we have seen how to create an RX API that bridges several types of existing APIs, and how to avoid some common mistakes while writing an RX API. In the next chapter, we will talk in more detail about how RX is scheduled.

7. Schedulers and concurrency

When creating an RX API, it is key to realize that the threading model of your API and the threading of the application consuming your API can be different and might interfere:

  • The threading model of your API is defined by the APIs your implementation uses

  • The threading model of the application is defined by the runtime hosting the application

Here are a few examples:

  • your API is implemented with a non-blocking event loop library

  • your API is implemented with JDBC (a thread blocking API)

  • the application runs in a main

  • the application uses a thread-per-request model in a servlet container

  • the application runs in a non-blocking event loop server

As API designer, you have two responsibilities:

  • understand the concurrency of your implementation

  • properly document the concurrency of your API

And yes sorry, RX won’t exempt you from writing documentation, quite the opposite actually.

In this chapter, we are going to cover the execution model behind RX and how Schedulers helps you manage concurrency. First rule: Schedulers do not schedule, so don’t be confused by the name. But before diving into schedulers, let’s explain how RX emission works. Remember, an emission is when an observed stream is pushing a new item to an observer/subscriber.

7.1. Synchronous emissions

Concurrency in RxJava is simple to execute, but somewhat difficult to understand. By default, streams execute work on the caller thread, which is the thread that subscribed it. In many of our earlier examples, this was the main thread that kicked off our main method. This synchronous emission happens when an Emitter (as the ones used the previous chapter) is invoked during the subscription. This invocation is made by the thread that is executing the subscription (by calling .subscribe()). This behavior is described in me.escoffier.lab.chapter6.Code1:

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

import static me.escoffier.superheroes.Helpers.log;

public class Code1 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String[] args) {
        Observable<Object> observable = Observable.create(emitter -> {
            for (String superHero : SUPER_HEROES) {
                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        });

        log("---------------- Subscribing");
        observable.subscribe(
            item -> log("Received " + item),
            error -> log("Error"),
            () -> log("Complete"));
        log("---------------- Subscribed");
    }
}

Run this example, you should see:

time thread      log
0    main        ---------------- Subscribing
24   main        Emitting: Superman
25   main        Received Superman
25   main        Emitting: Batman
25   main        Received Batman
25   main        Emitting: Aquaman
25   main        Received Aquaman
25   main        Emitting: Asterix
26   main        Received Asterix
26   main        Emitting: Captain America
26   main        Received Captain America
26   main        Completing
26   main        Complete
26   main        ---------------- Subscribed

When running this example, you can see a few important points:

  • the main thread is the only one involved and all operations are executed on this thread

  • each emission is followed exactly by its corresponding reception

  • the next statement after the subscription operation happens after all the emissions - meaning that the stream has been completely consumed before.

Synchronous emission is not much of a problem when no latency is involved, i.e when the operations only involve pure computation and don’t involve operations that will pause the thread of execution. However, when an operation has latency, the synchronous emission will impose this latency to the subscribing thread like we can see in this example (me.escoffier.lab.chapter6.Code2).

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

import static me.escoffier.superheroes.Helpers.log;
import static me.escoffier.superheroes.Helpers.sleep;

public class Code2 {

  private static List<String> SUPER_HEROES = Arrays.asList(
      "Superman",
      "Batman",
      "Aquaman",
      "Asterix",
      "Captain America"
  );

  public static void main(String[] args) {
    Observable<Object> observable = Observable.create(emitter -> {
      for (String superHero : SUPER_HEROES) {
        sleep(30); // Introduce fake latency
        log("Emitting: " + superHero);
        emitter.onNext(superHero);
      }
      log("Completing");
      emitter.onComplete();
    });

    log("---------------- Subscribing");
    observable.subscribe(
        item -> log("Received " + item),
        error -> log("Error"),
        () -> log("Complete"));
    log("---------------- Subscribed");
  }

}

This example produces the following output:

0    main        ---------------- Subscribing
58   main        Emitting: Superman
59   main        Received Superman
89   main        Emitting: Batman
89   main        Received Batman
121  main        Emitting: Aquaman
122  main        Received Aquaman
155  main        Emitting: Asterix
156  main        Received Asterix
189  main        Emitting: Captain America
189  main        Received Captain America
190  main        Completing
190  main        Complete
190  main        ---------------- Subscribed

It is exactly like the previous example, however, we can see that the artificial delay in the emission impacts the application code that receives the items. In our example, our stream is very small, so it’s not a big deal. But imagine the same code, with a lot more items…​ The main thread would be blocked for a very long time. Definitely not great.

7.2. Asynchronous emissions

Asynchronous emission happens when an Emitter is invoked asynchronously by a different thread than the subscription thread. For example, open me.escoffier.lab.chapter6.Code3:

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

import static me.escoffier.superheroes.Helpers.log;

public class Code3 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String[] args) {
        Observable<Object> observable = Observable.create(emitter -> {
            new Thread(() -> {
                for (String superHero : SUPER_HEROES) {
                    log("Emitting: " + superHero);
                    emitter.onNext(superHero);
                }
                log("Completing");
                emitter.onComplete();
            }).start();
        });

        log("---------------- Subscribing");
        observable.subscribe(
            item -> log("Received " + item),
            error -> log("Error"),
            () -> log("Complete"));
        log("---------------- Subscribed");
    }
}

Run this example, you should see:

time thread      log
0    main        ---------------- Subscribing
23   main        ---------------- Subscribed
23   Thread-0    Emitting: Superman
23   Thread-0    Received Superman
23   Thread-0    Emitting: Batman
24   Thread-0    Received Batman
24   Thread-0    Emitting: Aquaman
24   Thread-0    Received Aquaman
24   Thread-0    Emitting: Asterix
24   Thread-0    Received Asterix
24   Thread-0    Emitting: Captain America
25   Thread-0    Received Captain America
25   Thread-0    Completing
25   Thread-0    Complete

With this example:

  • 2 threads are involved, main and Thread-0 created using new Thread(…​).start()

  • the main thread is the application thread

  • the Thread-0 thread is used for emitting the items

  • each emission is followed exactly by its corresponding reception (like before) because it reuses the emitter thread

  • the subscription happens and returns immediately before the actual observable completion happens

In this situation the application’s main thread will not clearly be impacted by the API pauses, however, some parts of the application will now be executed on the Thread-0 thread. If the code chooses to block (for example using a blocking method), then the emitter thread will be impacted by the blocking. This means that it won’t be able to emit another item until the thread is unblocked. This is very important to understand. When you write long pipelines, with multiple stages, if one of the stage blocks, you are blocking the next emission. While sometimes it’s what you want to do, don’t forget that if you are observing a hot stream, the emissions will be buffered somewhere (if you don’t or can’t use back pressure). This behavior is illustrated in me.escoffier.lab.chapter6.Code4:

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

import static me.escoffier.superheroes.Helpers.log;
import static me.escoffier.superheroes.Helpers.sleep;

public class Code4 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String[] args) {
        Observable<Object> observable = Observable.create(emitter -> {
            Thread thread = new Thread(() -> {
                for (String superHero : SUPER_HEROES) {
                    log("Emitting: " + superHero);
                    emitter.onNext(superHero);
                }
                log("Completing");
                emitter.onComplete();
            });
            thread.start();
        });

        log("---------------- Subscribing");
        observable
            // Blocking the emission thread
            .doOnNext(x -> sleep(30))
            .subscribe(
                item -> log("Received " + item),
                error -> log("Error"),
                () -> log("Complete"));
        log("---------------- Subscribed");
    }
}

Run this example, you should see:

time thread      log
0    main        ---------------- Subscribing
23   main        ---------------- Subscribed
23   Thread-0    Emitting: Superman
58   Thread-0    Received Superman
58   Thread-0    Emitting: Batman
92   Thread-0    Received Batman
92   Thread-0    Emitting: Aquaman
125  Thread-0    Received Aquaman
125  Thread-0    Emitting: Asterix
159  Thread-0    Received Asterix
160  Thread-0    Emitting: Captain America
190  Thread-0    Received Captain America
190  Thread-0    Completing
191  Thread-0    Complete

7.3. Scheduling operations

Ok, let’s now discuss schedulers. RxJava schedulers can change the behavior of the emitter and subscriber threads. Typically, you can choose on which threads the emissions happen. A scheduler is very similar to a Java Executor, it can actually be seen as a thread pool (most of the time they are).

RxJava makes concurrency and multi-threading much easier. RxJava handles concurrency for you mainly using two operators: subscribeOn() and observeOn(). Some operators such as flatMap() can be combined with these two operators to create concurrent data processing. But again, don’t be fooled. While RxJava can help you make safe and powerful concurrent applications, you still need to be aware of the traps and pitfalls in multi-threading. The Effective Java book is an excellent resource that every Java developer should have, and it covers best practices for concurrent applications.

7.3.1. subscribeOn

The subscribeOn operation can change the emitter thread (me.escoffier.lab.chapter6.Code5).

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static me.escoffier.superheroes.Helpers.log;
import static me.escoffier.superheroes.Helpers.threadFactory;

public class Code5 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String[] args) throws Exception {

        Scheduler scheduler = Schedulers.from(newFixedThreadPool(10, threadFactory));

        CountDownLatch latch = new CountDownLatch(1);

        // Synchronous emission
        Observable<Object> observable = Observable.create(emitter -> {
            for (String superHero : SUPER_HEROES) {
                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        });

        log("---------------- Subscribing");
        observable
            .subscribeOn(scheduler)
            .subscribe(
                item -> log("Received " + item),
                error -> log("Error"),
                () -> {
                    log("Complete");
                    latch.countDown();
                });
        log("---------------- Subscribed");

        latch.await();
    }
}

Run this example, you should see:

0    main                          ---------------- Subscribing
83   main                          ---------------- Subscribed
84   Scheduler-0                   Emitting: Superman
86   Scheduler-0                   Received Superman
86   Scheduler-0                   Emitting: Batman
87   Scheduler-0                   Received Batman
87   Scheduler-0                   Emitting: Aquaman
88   Scheduler-0                   Received Aquaman
89   Scheduler-0                   Emitting: Asterix
89   Scheduler-0                   Received Asterix
90   Scheduler-0                   Emitting: Captain America
90   Scheduler-0                   Received Captain America
91   Scheduler-0                   Completing
91   Scheduler-0                   Complete

We perform an asynchronous emission like before but instead of managing the thread directly we use the subscribeOn operation to execute the emitter’s subscribe operation on the scheduler.

NOTE: the countdown latch is used to keep the program running until the end of the streams.

7.3.2. observeOn

The observeOn operations can change the subscriber thread (me.escoffier.lab.chapter6.Code6).

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;

import java.util.Arrays;
import java.util.List;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static me.escoffier.superheroes.Helpers.log;
import static me.escoffier.superheroes.Helpers.threadFactory;

public class Code6 {

    private static List<String> SUPER_HEROES = Arrays.asList(
        "Superman",
        "Batman",
        "Aquaman",
        "Asterix",
        "Captain America"
    );

    public static void main(String[] args) {

        Scheduler scheduler = Schedulers.from(newFixedThreadPool(10, threadFactory));

        // Synchronous emission
        Observable<Object> observable = Observable.create(emitter -> {
            for (String superHero : SUPER_HEROES) {
                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        });

        log("---------------- Subscribing");
        observable
            .observeOn(scheduler)
            .subscribe(
                item -> log("Received " + item),
                error -> log("Error"),
                () -> log("Complete"));
        log("---------------- Subscribed");
    }
}

Run this example, you should see:

0    main                          ---------------- Subscribing
152  main                          Emitting: Superman
157  Scheduler-0                   Received Superman
157  main                          Emitting: Batman
159  Scheduler-1                   Received Batman
159  main                          Emitting: Aquaman
160  Scheduler-2                   Received Aquaman
161  main                          Emitting: Asterix
162  Scheduler-3                   Received Asterix
163  main                          Emitting: Captain America
164  Scheduler-4                   Received Captain America
165  main                          Completing
166  main                          ---------------- Subscribed
166  Scheduler-5                   Complete

We perform a synchronous emission but the subscriber thread now executes on the scheduler thread instead of the main thread thanks to the observeOn operation.

7.3.3. Schedulers

In the 2 previous examples, we have used a scheduler backed by a fixed thread pool created by a Java executor. There are different predefined schedulers by RxJava you can use too:

  • The computation scheduler for computational work such as event-loops and callback processing. This scheduler uses a fixed number of threads based on the processor count available to your Java process, making it appropriate for computational tasks. Computational tasks (such as math, algorithms, and complex stuff) may utilize cores to their fullest extent. Therefore, there is no benefit in having more worker threads than available cores to perform such work.

  • The io scheduler for IO bound operations - This scheduler is used for (blocking) IO tasks such as reading and writing databases, web requests, and disk storage are less expensive on the CPU and often have idle time waiting for the data to be sent or come back. This means you can create threads more liberally, and Schedulers.io() is appropriate for this use case. However, it maintains as many threads as there are tasks and grows, caches, and reduces the number of threads as needed. In the case of an application with lots of concurrent access, this scheduler can be problematic as it can create too many threads.

  • The new thread scheduler creates a new thread for each task. It does not use a pool and creates a new thread for each observer. The thread is destroyed when the observed stream completes.

  • The single scheduler uses a single thread. It’s appropriate for event-looping. It can be useful when dealing with fragile and non-thread-safe code.

In addition, if you are using Vert.x, it provides additional RxJava schedulers to execute operations within Vert.x thread pools (event loop and worker thread pools).

7.4. Schedulers in practice

Now let’s see a couple of examples where we can directly apply schedulers to control the concurrency of our API. In the first exercise, we use the java.net.HttpURLConnection to achieve an HTTP request to the Super Heroes Service. HttpURLConnection is certainly one of the worst ways to do an HTTP request but it’s a good example of an API that blocks waiting for IO completions. In me.escoffier.lab.chapter6.Code7, use the subscribeOn, to change the thread performing the HTTP requests. As it’s clearly an io task, use the Schedulers.io() scheduler.

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.json.JsonObject;
import me.escoffier.superheroes.SuperHeroesService;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

import static java.util.concurrent.Executors.newFixedThreadPool;
import static me.escoffier.superheroes.Helpers.log;
import static me.escoffier.superheroes.Helpers.threadFactory;

public class Code7_Solution {

    private static final int[] SUPER_HEROES_BY_ID = {641, 65, 37, 142};

    public static void main(String[] args) {

        SuperHeroesService.run(false);

        Scheduler scheduler = Schedulers.from(newFixedThreadPool(10, threadFactory));

        Observable<String> observable = Observable.<String>create(emitter -> {
            for (int superHeroId : SUPER_HEROES_BY_ID) {
                // Load a super hero using the blocking URL connection
                URL url = new URL("http://localhost:8080/heroes/" + superHeroId);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = rd.readLine()) != null) {
                    result.append(line);
                }
                rd.close();
                String superHero = new JsonObject(result.toString()).getString("name");

                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        })
            .subscribeOn(Schedulers.io());

        log("---------------- Subscribing");
        observable
            .subscribe(
                item -> log("Received " + item),
                error -> log("Error"),
                () -> log("Complete"));
        log("---------------- Subscribed");
    }
}

You should get an output like:

0    main                          ---------------- Subscribing
28   main                          ---------------- Subscribed
376  RxCachedThreadScheduler-1     Emitting: Superman
376  RxCachedThreadScheduler-1     Received Superman
380  RxCachedThreadScheduler-1     Emitting: Batman
380  RxCachedThreadScheduler-1     Received Batman
397  RxCachedThreadScheduler-1     Emitting: Aquaman
397  RxCachedThreadScheduler-1     Received Aquaman
425  RxCachedThreadScheduler-1     Emitting: Captain America
426  RxCachedThreadScheduler-1     Received Captain America
426  RxCachedThreadScheduler-1     Completing
426  RxCachedThreadScheduler-1     Complete

The second exercise extends the first one but now we assume that the same code is used in an event-loop model. For the example we use Vert.x, but the same reasoning would apply to other event-loop systems or even Android UI. In me.escoffier.lab.chapter6.Code8, the subscription is performed on the Vert.x event-loop thread. Using a blocking computation in this context is forbidden. In addition, the subscriber expects to be notified on the same thread.

Use the subscribeOn and observeOn to avoid blocking the event loop and deliver the subscriber notifications on the same event loop thread.

package me.escoffier.lab.chapter6;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.RxHelper;
import me.escoffier.superheroes.SuperHeroesService;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

import static me.escoffier.superheroes.Helpers.log;

public class Code8_Solution {

    private static final int[] SUPER_HEROES_BY_ID = {641, 65, 37, 142};

    public static void main(String[] args) {

        Vertx vertx = Vertx.vertx();
        Context context = vertx.getOrCreateContext();
        SuperHeroesService.run(false);

        Scheduler contextScheduler = RxHelper.scheduler(context);

        Observable<String> observable = Observable.<String>create(emitter -> {
            for (int superHeroId : SUPER_HEROES_BY_ID) {
                URL url = new URL("http://localhost:8080/heroes/" + superHeroId);
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("GET");
                BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder result = new StringBuilder();
                String line;
                while ((line = rd.readLine()) != null) {
                    result.append(line);
                }
                rd.close();
                String superHero = new JsonObject(result.toString()).getString("name");

                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        })
            .subscribeOn(Schedulers.io())
            .observeOn(contextScheduler);

        // Execute a Vert.x event loop task
        context.runOnContext(v -> {
            log("---------------- Subscribing");
            observable
                .subscribe(
                    item -> log("Received " + item),
                    error -> log("Error"),
                    () -> log("Complete"));
            log("---------------- Subscribed");
        });
    }
}

You should get an output like:

0    vert.x-eventloop-thread-0     ---------------- Subscribing
20   vert.x-eventloop-thread-0     ---------------- Subscribed
402  RxCachedThreadScheduler-1     Emitting: Superman
405  vert.x-eventloop-thread-0     Received Superman
408  RxCachedThreadScheduler-1     Emitting: Batman
408  vert.x-eventloop-thread-0     Received Batman
418  RxCachedThreadScheduler-1     Emitting: Aquaman
418  vert.x-eventloop-thread-0     Received Aquaman
435  RxCachedThreadScheduler-1     Emitting: Captain America
435  RxCachedThreadScheduler-1     Completing
435  vert.x-eventloop-thread-0     Received Captain America
436  vert.x-eventloop-thread-0     Complete

Notice that each emission is performed on the scheduler thread and each notification is performed on the same thread that did the subscription enforcing the event loop model from Vert.x

7.5. Conclusion

We have seen that RxJava subscribeOn and observeOn are powerful tools for controlling the execution thread of emissions and notifications. We used mainly the Observable type, but these apply to all reactive types: Single, Flowable, Completable and Maybe.

Your API should be as consistent as possible across all reactive methods, to respect the principle of least surprise. An application requiring to be notified on a different thread can always use observeOn to chose the scheduler the notifications will take place on.

So when returning a reactive type, you must always document the thread the emissions will take place on as well as the thread the notifications will take place on, to give the API user the opportunity to change this decision.

8. Testing all the things

So far we’ve built pipelines for processing data and events, and we’ve run them all from a traditional Java main method. Of course, you know that unit and integration testing are key ingredients to building sustainable software projects, and it is now time to turn to the test facilities provided by RX Java.

8.1. Testing streams

RX Java pipelines provide a special operator called test() that returns instances of io.reactivex.observers.TestObserver. As the name suggests, a TestObserver provides support for checking what an observable does.

Here is an example (me.escoffier.lab.chapter7.Code01):

package me.escoffier.lab.chapter7;

import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;

public class Code01 {

    public static void main(String[] args) {
        TestObserver<Integer> testObserver = Observable.range(1, 10)
                .filter(n -> n % 2 == 0)
                .test();

        testObserver
                .assertSubscribed()
                .assertNever(n -> n % 2 == 1)
                .assertComplete()
                .assertValueCount(5)
                .assertValues(2, 4, 6, 8, 10);
    }
}

The observable produces the even numbers between 1 and 10. Once we have a TestObserver we can perform different kinds of assertions, like:

  • that the observer has subscribed,

  • that no emitted number is odd,

  • that the completion event was sent,

  • that there are 5 emitted values,

  • that those values are (in order) 2, 4, 6, 8 and 10.

8.2. Testing back-pressured streams

The case of Flowable is a little bit different, as event emission is driven by subscribers rather than the producer (because of the back-pressure protocol). There is also a test method, albeit we can pass the number of items to be emitted initially, and it returns a TestSubscriber rather than a TestObserver:

package me.escoffier.lab.chapter7;

import io.reactivex.Flowable;
import io.reactivex.subscribers.TestSubscriber;

public class Code02 {

    public static void main(String[] args) {
        Flowable<Integer> flowable = Flowable.range(1, 10)
                .filter(n -> n % 2 == 0);

        TestSubscriber<Integer> testSubscriber = flowable.test(0);

        testSubscriber
                .assertNever(n -> n % 2 == 1)
                .requestMore(2)
                .assertValues(2, 4)
                .assertNotComplete()
                .requestMore(3)
                .assertValues(2, 4, 6, 8, 10)
                .assertComplete();

        testSubscriber = flowable.test(0);

        testSubscriber
                .assertNotComplete()
                .requestMore(2)
                .assertValues(2, 4)
                .cancel();

        testSubscriber
                .assertNotComplete();
    }
}

The requestMore(n) method allows requesting further elements. This is interesting, as we can test what the stream emits in a step-by-step fashion: here we request 2 items, check the currently emitted values and that the completion hasn’t been done yet, then request 3 further items and check completion.

We can also see in this example that we can test what happens when a subscription is being canceled at a very precise point in time, here after 2 events. With non-back-pressured streams, this would be harder to do since a TestObserver cannot control when emissions happen.

8.3. Testing errors

Of course, things do not always go according to the plan, and stream processing can end with an error. For all types of streams, we can check the error type, and also perform ad-hoc assertions on the resulting throwable object:

package me.escoffier.lab.chapter7;

import io.reactivex.Single;

import java.io.IOException;

public class Code03 {

    public static void main(String[] args) {
        Single.error(new IOException("Source closed"))
                .test()
                .assertNotComplete()
                .assertError(IOException.class)
                .assertError(t -> t.getMessage().equals("Source closed"));
    }
}

8.4. Taking control of the clock

Some streams are time-sensitive, as they emit events with delays or an interval-based pace. All of these streams need a scheduler to offload work at a later point in time. You could think of putting threads to sleep and wait for timing events to happen, but this is error-prone (non-determinism) and it slows test execution dramatically. A better solution is to use io.reactivex.schedulers.TestScheduler, a scheduler designed for testing. The good thing with TestScheduler is that you are the master of time, and you advance it manually. Here is an example where 2 Single are being zipped, producing a string value about 1 second from the subscription:

package me.escoffier.lab.chapter7;

import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class Code04 {

    public static void main(String[] args) throws Throwable {
        TestScheduler scheduler = new TestScheduler();

        Single<Long> s1 = Single.timer(1, TimeUnit.SECONDS, scheduler);
        Single<String> s2 = Single.just("Hello");
        Single<String> r = Single.zip(s1, s2, (t, s) -> t + " -> " + s);

        TestObserver<String> testObserver = r.test();

        testObserver.assertNoValues();

        scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
        testObserver.assertNoValues();

        scheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
        testObserver
                .assertNoErrors()
                .assertValue("0 -> Hello");
    }
}

Here we make a first time advance at 500ms, and we check that no value has been emitted yet. Adding a further 600ms, we check that a value was emitted.

It is interesting to note that this test actually completes fast as time is not clock time.

8.5. When you cannot control the clock

Of course, it is not always possible to control the clock with a TestScheduler. A good example is I/O operations: we can’t guarantee latency and we often use timeouts. In such cases, we need to let the test run on clock time. The following example produces string values every 500ms:

package me.escoffier.lab.chapter7;

import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;

import java.util.concurrent.TimeUnit;

public class Code05 {

    public static void main(String[] args) {
        Observable<String> strings = Observable.just("a", "b", "c", "d");
        Observable<Long> ticks = Observable.interval(500, TimeUnit.MILLISECONDS);
        Observable<String> stream = Observable.zip(ticks, strings, (t, s) -> s);

        TestObserver<String> testObserver = stream.test();
        if (testObserver.awaitTerminalEvent(3, TimeUnit.SECONDS)) {
            testObserver
                    .assertNoErrors()
                    .assertComplete()
                    .assertValues("a", "b", "c", "d");
            System.out.println("Cool!");
        } else {
            System.out.println("It did not finish on time");
        }
    }
}

Since this is best-effort scheduling going on, we don’t have strong guarantees on the exact timing of events, so one way to work is to wait for a bit more than the expected duration. Indeed, emissions are from another thread because of the ticks stream.

The awaitTerminalEvent allows waiting for a completion or error to happen. If awaiting exceeds the duration, then it returns false and we know that we have a timeout. It is possible to use other await…​ methods, like waiting only for completion, or for a number of events.

This is a much better approach than using a Thread.sleep(t)…​

8.6. Putting it all together in JUnit

We are going to test an HTTP request to a third-party service that returns 520 superheroes. This is all encapsulated in the me.escoffier.superheroes.Helpers class. A naive way to test is to assume traditional synchronous semantics and just observe what the stream emits:

package me.escoffier.lab.chapter7;

import me.escoffier.superheroes.Helpers;
import me.escoffier.superheroes.Character;
import org.junit.Test;

import java.util.ArrayList;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class BadAsyncTest {

    private static class TBox {
        private Throwable t;

        void set(Throwable t) {
            this.t = t;
        }
    }

    @Test
    public void theWrongWayToTest() throws Throwable {
        TBox box = new TBox();
        ArrayList<Character> stuffs = new ArrayList<>();

        Helpers.heroes()
                .subscribe(stuffs::add, box::set, () -> System.out.println("[ done ]"));

         Thread.sleep(5000);

        if (box.t != null) {
            throw box.t;
        }
        assertThat(stuffs.size(), equalTo(520));
    }
}
  1. What happens when you run the test?

  2. Why is un-commenting the Thread.sleep(5000) statement a fix, albeit a dirty fix?

Using your knowledge of TestSubscriber, rewrite the test with the RX Java testing goodies in a cleaner fashion:

package me.escoffier.lab.chapter7;

import io.reactivex.subscribers.TestSubscriber;
import me.escoffier.superheroes.Helpers;
import me.escoffier.superheroes.Character;
import org.junit.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AsyncTest_Solution {

    @Test
    public void theRightWayToTest() throws TimeoutException {
        TestSubscriber<Character> testSubscriber = Helpers.heroes().test();
        if (!testSubscriber.awaitTerminalEvent(5, TimeUnit.SECONDS)) {
            throw new TimeoutException("It timed out!");
        }
        testSubscriber
                .assertComplete()
                .assertNoErrors()
                .assertValueCount(520);
    }
}

9. Conclusion

Here we are…​ it will be time to part. But before this, I would like to tell you a few more things.

With the reactive trend, being able to provide asynchronous and reactive API is becoming more and more important. In this lab, we have seen one way to build such an API with RX Java. It’s not the only way but it’s a popular way. RX Java gives you superpowers to build asynchronous and concurrent applications. But as a superhero, you know "with great power comes great responsibility".

Don’t believe these agile gurus telling you that documentation is useless. When dealing with asynchronous things, documentation, and mainly javadoc, is primordial. Each method must be documented and a few sets of aspects must be described:

  • How errors are propagated

  • Back-pressure: If there is back-pressure - how the back pressure protocol behaves on your method

  • Scheduler: On which scheduler the method runs, whether it can be configured.

For example, the delay operator is documented as follows:

@CheckReturnValue
@BackpressureSupport(value=FULL)
@SchedulerSupport(value="none")
public final <U> Flowable<T> delay(Function<? super T,? extends Publisher<U>> itemDelayIndicator)

Returns a Flowable that delays the emissions of the source Publisher via another Publisher on a per-item basis.

Note: the resulting Publisher will immediately propagate any onError notification from the source Publisher.

Backpressure: The operator doesn't interfere with the backpressure behavior which is determined by the source Publisher. All of the other Publishers supplied by the function are consumed in an unbounded manner (i.e., no backpressure applied to them).

Scheduler: This version of delay does not operate by default on a particular Scheduler.

Type Parameters:
U - the item delay value type (ignored)

Parameters:
itemDelayIndicator - a function that returns a Publisher for each item emitted by the source Publisher, which is then used to delay the emission of that item by the resulting Publisher until the Publisher returned from itemDelay emits an item

Returns: a Flowable that delays the emissions of the source Publisher via another Publisher on a per-item basis

See Also: ReactiveX operators documentation: Delay

The RX library proposes lots of operators. You can also implement your own. However, this is a complicated task, so before you do that, you should check the following projects providing some custom operators for RX Java 2:

This time, it’s the end. I hope you enjoyed this lab. Of course, comments, feedback and pull requests are more than welcome.