1. Preface

Let’s start from the beginning…​. Vert.x. What’s Vert.x? That’s a pretty good question, and probably a good start. If you go on the Vert.x web site, Vert.x is defined as "a toolkit for building reactive applications on the JVM". This description is rather unclear, right? What’s a toolkit? What are reactive applications? In this lab, we are going to explain these words, and build an application using Vert.x illustrating what Vert.x is. This application is going to be composed of microservices. Another buzzword that is currently trending, right ? Actually, Vert.x was already promoting microservices before anyone else.

The developed application is going to be:

  • based on Vert.x (that’s why you are here, right?)

  • distributed

  • built as a reactive system

  • (a bit) fun

This lab offers attendees an intro-level, hands-on session with Vert.x, from the first line of code, to making services, to consuming them and finally to assembling everything in a consistent reactive system. It illustrates what reactive systems are, what reactive programming is, and how to build applications based on reactive microservices (and the s is important).

This is a BYOL (Bring Your Own Laptop) session, so bring your Windows, OSX, or Linux laptop. You need JDK 8+ on your machine, and Apache Maven (3.3+).

What you are going to learn:

  • What Vert.x is and how to use its asynchronous non-blocking development model

  • How to develop microservices with Vert.x with several types of services, and service discovery

  • What verticles are and how to use them

  • How to use the Vert.x event bus to send and receive messages

  • How to expose HTTP endpoints with Vert.x, and also how to consume them

  • How to compose asynchronous actions

  • How to use several languages in the same application

  • How to use databases with Vert.x

  • How to manage failures with async results, futures, exception handlers and circuit breakers

And many more…​

2. Vert.x

We will attempt to explain Vert.x in just a few lines. Remember that we said in the previous section that Vert.x is "a toolkit for building reactive applications on the JVM".

There are a three important points in this description: toolkit, reactive and "on the JVM".

Firstly, Vert.x is a toolkit. Meaning, Vert.x is not an application server, a container nor a framework. It’s not a JavaScript library either. Vert.x is a plain old jar file, so a Vert.x application is an application that uses this jar file. Vert.x does not define a packaging model, all Vert.x components are plain boring jar files. How does this impact you and your application? Let’s imagine you are using a build tool such as Maven or Gradle, to make your application a Vert.x application just add the vertx-core dependency. Wanna use another Vert.x components, just add it as a dependency. It’s simple, burden-less. Starting the application is a simple class with the public static void main(String[] args) entry point. No specific IDE or plugin to install to start using Vert.x.

Therefore, to use the awesomeness provided by Vert.x, you just need to use it in your code, but be patient, this will be covered later.

Secondly, Vert.x is reactive. It is specifically made to build reactive applications, or more appropriately, systems. Reactive systems [1] has been defined in the Reactive Manifesto. Although, it’s not long document to read, we will further reduce it to these 4 bullet points:

  • Responsive: a reactive system needs to handle requests in a reasonable time (I let you define reasonable).

  • Resilient: a reactive system must stay responsive in the face of failures (crash, timeout, 500 errors…​), so it must be designed for failures and deal with them appropriately.

  • Elastic: a reactive system must stay responsive under various loads. As a consequence, it must scale up and down, and be able to handle the load with minimal resources.

  • Message driven: components from a reactive system interacts using asynchronous message-passing.

Also, Vert.x is event-driven and also non-blocking. Events are delivered in an event loop that must never be blocked. Let’s explain why. Unlike traditional, let’s say "enterprise", systems, Vert.x uses a very small number of threads. Some of these threads are event loops, they are responsible for dispatching the events in Handlers. If you block this thread, the events won’t be delivered anymore. This execution model impacts how you write your code, instead of the traditional mofrl of blocking code, your code is going to be asynchronous [2] and non-blocking [3].

As an example, if we wanted to retrieve a resoure from a URL, we would do something like this:

URL site = new URL("http://vertx.io/");
BufferedReader in = new BufferedReader(new InputStreamReader(site.openStream()));

String inputLine;
while ((inputLine = in.readLine()) != null) {
  System.out.println(inputLine);
}
in.close();

But with Vert.x we are more likely to do:

vertx.createHttpClient().getNow(80, "vertx.io", "", response -> {
  response.bodyHandler(System.out::println);
});

The main differences between these 2 codes are:

  • the first one is synchronous and potentially blocking : the instructions are executed in order, and may block the thread for a long time (because the web site may be slow or whatever).

  • the Vert.x one is asynchronous and non-blocking: the thread (event loop) is released while the connection with the HTTP server is established and so can do something else. When the response has been received, the same event loop calls the callback. Most of the Vert.x components are single-threaded (accessed only by a single thread), so no concurrency burden anymore. By the way, with Vert.x, even the DNS resolution is asynchronous and non-blocking (while Java DNS resolution is blocking).

Finally, Vert.x applications runs "on the JVM" the Java Virtual Machine (8+). This means Vert.x applications can be developed using any language that runs on the JVM. Including Java(of course), Groovy, Ceylon, Ruby, JavaScript etc. We can even mix and match any combination of all these languages. The polyglot nature of Vert.x application allows you use the most appropriate language for the task.

Vert.x lets you implement distributed applications, either by using the built-in TCP and HTTP server and client, but also using the Vert.x event bus, a lightweight mechanism to send and receive messages. With the event bus, you send messages to addresses. It supports three modes of distributions:

  1. point to point: the message is sent to a single consumer listening on the address

  2. publish / subscribe: the message is received by all the consumers listening on the address

  3. request / reply: the message is sent to a single consumer and let it reply to the message by sending another message to the initial sender

Wao!, that’s a lot of information to process…​ However, you might still want to ask: What kind of applications can I use Vert.x for? We say, Vert.x is incredibly flexible - whether it’s simple network utilities, sophisticated modern web applications, HTTP/REST microservices, high volume event processing or a full blown backend message-bus application, Vert.x is a great fit. It’s fast, and does not constraint you. Last but not least, Vert.x provides appropriate tools to build reactive systems; sytems that are: responsive, elastic, resilient and asynchronous!

3. Demystifying microservices

Except you spent the last year in a cave, you probably have heard about microservices. So what are microservices? To answer this questions, let’s quote from a veteran:

The microservice architectural style is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API. These services are built around business capabilities and independently deployable by fully automated deployment machinery. There is a bare minimum of centralized management of these services, which may be written in different programming languages and use different data storage technologies.
— Martin Fowler
http://martinfowler.com/articles/microservices.html

Microservice is an architectural style, which means it is a specialization of element and relation types, together with constraints and how they can be used [4]. I believe by now, I have left you more confused than you before we started.

Not to worry. Let’s take another approach. Why do we need microservices? In one word: agility [5]. Let’s imagine, we have an application, rather large. As a large application, the maintenance is a nightmare, adding features take too much time, the technology used is very outdated (What? Corba is not cool anymore?), any change needs to pass a 50-steps process and be validated by 5 levels of management. Obviously there are several teams on the application with different requirements and agendas. Well, we have such a monster app. How could we make the development and maintenance of this application efficient? Microservices are one answer to this question. It aims to reduce the time to production.

To do that end, the microservice architectural style proposes to:

  1. split the application into a set of decoupled components providing defined services (defined means with a known interface or API)

  2. allow the components communicate with whatever protocol the choose, often REST, but not necessarily

  3. allow the components use whatever languages and technologies they want

  4. allow each component be developed, released and deployed independently

  5. allow the deployments be automated in their own pipeline

  6. allow the orchestration of the whole application be reduced to the barest minimum

In this lab, we won’t address point 5, but you should know that Vert.x does not restricts how you deploy your components. You can employ whatever technology best suites your environment. whether it is ssh, ansible, puppet, docker, cloud, fabric8, or even floppy disk.

Point 6, however, is interesting, and often misunderstood. It’s pretty cool to develop independent pieces of software that magically interact at runtime. Yes, I said magically but in technology we don’t believe in magic. To make this happen, what we need is some form of service discovery at runtime. The service discovery mechanism can achieve it’s goal with any number of suitable means. This range from; hardcoding the service location in the code (which is generally a bad idea), using a DNS lookup service, or some more advanced techniques.

Having a service discovery mechanism allows our system components interact transparently amongst each other regardless of location or envoronment. It also allows us to easily load-balance amongst our components through a round robin alogorithm, for example, thereby making our system more fault-tolerant (by locating another service provider when one breaks down).

Although by definition, microservice applications are not required to be distributed, there usually are in practice. This comes with all the distributed application benefits and constraints: consensus computation (FLP), CAP theorem, consistency, monitoring, and many other reasons to fail. So microservices applications need to be designed to accomodate failures from their early implementation stage.

Before we go further, there are a couple of points I would like to mention. Microservices are not new and the concept is not rocket science. Academic papers from the 70’s and 80’s have defined (using different words) architectural styles very close to this. Also very important point to understand is: microservices are not a silver bullet. (Unless well managed) it has the capacity to increase the complexity of your application due to its distributed nature. Lastly, a microservice architecture will not fix all your issues.

The major concerns when it comes microservices are rapid delivery, adaptation, independence and replaceability. Every microservice is made to be replaceable by another providing the same service / API / interface (at the core, it’s basically an application of the Liskov substitution principle).

If you have been a developer for about 10 years, you might want to ask what difference is between microservices and SOA. For a lot of people it’s about size. This is not always true because services don’t necessarilly have to be small which makes the term "microservice" quite misleading. Microservices and SOA differ purpose but the basic concepts are similar:

  • service : a defined feature accessible by an API, a client, a proxy, anything

  • service provider: a component implementing a service

  • service consumer: a component consuming a service

  • service discovery: the mechanism used by a consumer to find a provider

Both approaches inherit from the service oriented computing, aiming to decompose and manage independent pieces of software. You may have never heard about this even if you used it: COM, Corba, Jini, OSGi, and web services are all different implementations of service oriented computing.

Finally, there is a common misconception that microservices must be RESTful by nature. This can’t be farther from the truth. Microservices can employ any number interaction style that best fit their purpose: RPC, events, messages, streams etc. In this lab we will using RESTful services, async RPC, and message sources. ## The Micro-Trader Application

Now that we know more about Vert.x and microservices, it’s time to discuss the application we are going to develop in this lab.

It’s going to be a fake financial app, where we will be making (virtual) money. The application is composed of a set of microservices:

  • The quote generator - this is an absolutely unrealistic simulator that generates the quotes for 3 fictional companies MacroHard, Divinator, and Black Coat. The market data is published on the Vert.x event bus.

  • The traders - these are a set of components that receives quotes from the quote generator and decides whether or not to buy or sell a particular share. To make this decision, they rely on another component called the portfolio service.

  • The portfolio - this service manages the number of shares in our portfolio and their monetary value. It is exposed as a service proxy, i.e. an asynchronous RPC service on top of the Vert.x event bus. For every successful operation, it sends a message on the event bus describing the operation. It uses the quote generator to evaluate the current value of the portfolio.

  • The audit - that’s the legal side, you know…​ We need to keep a list of all our operations (yes, that’s the law). The audit component receives operations from the portfolio service via an event bus and address . It then stores theses in a database. It also provides a REST endpoint to retrieve the latest set of operations.

  • The dashboard - some UI to let us know when we become rich.

Let’s have a look at the architecture:

Micro-Trader Architecture

The application uses several types of services:

  • HTTP endpoint (i.e. REST API) - this service is located using an HTTP URL.

  • Service proxies - these are asynchronous services exposed on the event bus using an RPC interaction mechanism, the service is located using an (event bus) address.

  • Message sources - these are components publishing messages on the event bus, the service is located using an (event bus) address.

These components runs in the same network (in this lab they will be on the same machine, but in different processes).

The dashboard presents the available services, the value of each company’s quotes, the latest set of operations made by our traders and the current state of our portfolio. It also shows the state of the different circuit breakers.

Dashboard

We are going to implement critical parts of this application. However, the rest of the code is provided to illustrate some other Vert.x features. The code that needs to be written by us is indicated using TODO and wrapped as follows:

//TODO
// ----
// your code here
// ----

4. Prerequisites

We will get to the good stuff, coding all the way soon…​ But before we start, we need to install a couple of software on our machine.

4.1. Hardware

  • Operating System: Mac OS X (10.8+), Windows 7+, Ubuntu 12+, CentOS 7+, Fedora 22+

  • Memory: At least 4 GB+, preferred 8 GB

4.2. Java Development Kit

We need a JDK 8+ installed on our machine. Latest JDK can downloaded from:

You can use either Oracle JDK or OpenJDK.

4.3. Maven

4.4. IDE

We recommend you to use an IDE. You can use Eclipse, IntelliJ or Netbeans.

4.4.1. No IDE ?

If you don’t have an IDE, here are the step to get started with Eclipse.

  1. First download Eclipse from the download page.

  2. In the Eclipse Package list, select Eclipse IDE for Java Developers. It brings you to a download page with a Download button.

  3. Once downloaded, unzip it.

  4. In the destination directory, you should find an Eclipse binary that you can execute.

  5. Eclipse asks you to create a workspace.

  6. Once launched, click on the Workbench arrow (top right corner).

5. Let’s start !

5.1. Getting the code

git clone https://github.com/cescoffier/vertx-microservices-workshop

You can import the code in your IDE as a Maven project. You can refer to your IDE documentation to know how to import Maven projects.

For Eclipse:

  1. Click on File - Import …​ - Maven - Existing Maven Projects

  2. Select the location where you cloned the sources

  3. Click Finish and wait…​

You are going to see a couple of compilation errors. This is because Eclipse does not mark the src/main/generated directories as source root by default. Right-click on portfolio-service/src/main/generated and select Build Path → Use as Source Folder.

A complete solution is given in the solution directory.

5.2. Build

The build is managed using Apache Maven. Compile everything using:

cd vertx-microservices-workshop
mvn clean install

As said previously, Vert.x does not depend on Apache Maven. You can use any build tool (or even no build tool).

5.3. Clustering and Service Discovery infrastructure

It’s about time to start. If you remember the microservice section, we need some service discovery mechanism. Fortunately, Vert.x offers a service discovery mechanism. To let every component discover the services, it needs to store the service record in a location that every one can access.

By default, Vert.x service discovery is going to use a distributed map accessible by all the members of the cluster. So, when you start your Vert.x application and enable the cluster mode, it joins a cluster. This cluster lets nodes:

  • to come and go, and so manage member discovery (member != service)

  • to share data such as distributed maps, locks, counters…​

  • to access the event bus

In our context, you don’t need to configure anything. The lab provides a cluster configuration using unicast and the 127.0.0.1 IP (so you won’t see the services from your neighbors). The cluster is built on top of Hazelcast. You can check the configuration in vertx-workshop-common/src/main/resources/cluster.xml.

6. The first microservice - the quote generator

Disclaimer, the quote generator is unrealistic, and is basically a set of randoms…​

6.1. Project structure

Let’s have a look to the project, as every other project are structured the same way.

.
├── README.md <--- component description
├── pom.xml <--- the Maven file
└── src
    ├── conf
    │   └── config.json <--- a configuration file that is passed when the application starts
    └── main
        └── java
            └── io.vertx.workshop.quote
                            ├── GeneratorConfigVerticle.java <--- the verticles
                            ├── QuoteVerticle.java
                            └── RestQuoteAPIVerticle.java

Let’s start with the pom.xml file. This file specifies the Maven build:

  1. Define the dependencies

  2. Compile the java code and process resources (if any)

  3. Build a fat-jar

A fat-jar (also called uber jar or shaded jar) is a convenient way to package a Vert.x application. It creates a über-jar containing your application and all its dependencies, including Vert.x. Then, to launch it, you just need to use java -jar …​., without having to handle the CLASSPATH. Wait, I told you that Vert.x does not dictate a type of packaging. It’s true, fat jars are convenient, but it’s not the only way, you can use plain (not fat) jars, OSGi bundles…​

The created fat-jar is configured to use a main class provided in vertx-workshop-common (io.vertx.workshop .common.Launcher). This Launcher class creates the Vert.x instance, configure it and deploys the main-verticle. Again, it’s just a convenient way, you can use your own main class.

6.2. Verticle

As you may have noticed, the code is structured in 3 verticles, but what are these? Verticles is a way to structure Vert.x application code. It’s not mandatory, but it is quite convenient. A verticle is a chunk of code that is deployed on top of a Vert.x instance. A verticle has access to the instance of vertx on which it’s deployed, and can deploy other verticles.

Let’s open the GeneratorConfigVerticle class, and look at the start method:

@Override
public void start() {
    super.start();

    JsonArray quotes = config().getJsonArray("companies");
    for (Object q : quotes) {
      JsonObject company = (JsonObject) q;
      // Deploy the verticle with a configuration.
      vertx.deployVerticle(MarketDataVerticle.class.getName(),
         new DeploymentOptions().setConfig(company));
    }

    vertx.deployVerticle(RestQuoteAPIVerticle.class.getName());

    publishMessageSource("market-data", ADDRESS, rec -> {
      if (!rec.succeeded()) {
        rec.cause().printStackTrace();
      }
      System.out.println("Market-Data service published : " + rec.succeeded());
    });

    publishHttpEndpoint("quotes", "localhost", config().getInteger("http.port", 8080), ar -> {
      if (ar.failed()) {
        ar.cause().printStackTrace();
      } else {
        System.out.println("Quotes (Rest endpoint) service published : " + ar.succeeded());
      }
    });
}

Verticles can retrieve a configuration using the config() method. Here it gets the details about the companies to simulate. The configuration is a JsonObject. Vert.x heavily uses JSON, so you are going to see a lot of JSON in this lab. For each company found in the configuration, it deploys another verticle with the extracted configuration. Finally, it deploys another verticle providing a very simple HTTP API.

The last part of the method is about the service discovery mentioned in the microservice section. This component generates quotes sent on the event bus. But to let other components discover where the messages are sent (where means on which address), it registers it. market-data is the name of the service, ADDRESS is the event bus address on which the messages are sent. The last argument is a Handler that is notified when the registration has been completed. The handler receives a structure called AsyncResult.

Remember, Vert.x is promoting an asynchronous, non-blocking development model. Publishing the service may take time (actually it does as it creates a record, write it to the backend, and notifies everyone), as we cannot block the event loop, the method is asynchronous. Asynchronous methods have a Handler as last parameter that is invoked when the operation has been completed. This Handler is called with the same event loop as the one having called the async method. As the asynchronous operation can fail, the Handler receives as parameter an AsyncResult telling whether or not the operation has succeeded. You will see the following patterns a lot in Vert.x applications:

 // Asynchronous method returning an object of type X
 operation(param1, param2, Handler<AsyncResult<X>>);

 // Handler receiving an object of type X

 ar -> {
   if (ar.succeeded()) {
      X x = ar.result();
      // Do something with X
   } else {
      // it failed
      Throwable cause = ar.cause();
   }
 }

If you remember the architecture, the quote generator also provides a HTTP endpoint returning the last values of the quotes (but, you are going to work on it). As the previous service, it needs to be published. For the publication it gives details on its locations (host, port…​):

publishHttpEndpoint("quotes", "localhost", config().getInteger("http.port", 8080), ar -> {
  if (ar.failed()) {
    ar.cause().printStackTrace();
  } else {
    System.out.println("Quotes (Rest endpoint) service published : " + ar.succeeded());
  }
});

6.3. The quote REST endpoint

It’s time for you to develop some parts of the application (I know you have pins and needles in your fingers). Open the RestQuoteAPIVerticle. It’s a verticle class extending AbstractVerticle. In the start method you need to:

  1. Register an event bus consumer to collect the last quotations (in the quotes map)

  2. Handle HTTP requests to return the list of quotes, or a single quote if the name (query) param is set.

Let’s do that…​.

6.3.1. Task - Implementing a Handler to receive events

The first action is about creating a Handler, so a method that is invoked on event. Handlers are an important part of Vert.x, so it’s important to understand what they are.

In this task, the Handler is going to be called for each message sent on the event bus on a specific address (receiving each quote sent by the generator). It’s a message consumer. The message parameter is the received message.

Implement the logic that retrieve the body of the message (with the body() method. Then extract from the body the name of the quote and add an entry name → quote in the quotes map.

JsonObject quote = message.body(); // 1
quotes.put(quote.getString("name"), quote); // 2

First, it retrieves the message body (1). It’s a JSON object, and stores it in the quotes map (2).

6.3.2. Task - Implementing a Handler to handle HTTP requests

Now that you did a first Handler, let’s do a second one. This handler does not receive messages from the event bus, but HTTP requests. The handler is attached to a HTTP server and is called every time there is a HTTP request to the server, the handler is called and is responsible for writing the response.

To handle the HTTP requests, we need a HTTP server. Fortunately, Vert.x lets you create HTTP servers using:

vertx.createHttpServer()
    .requestHandler(request -> {...})
    .listen(port, resultHandler);

Write the content of the request handler to respond to the request:

  1. a response with the content-type header set to application/json

  2. retrieve the name parameter (it’s the company name)

  3. if the company name is not set, return all the quotes as json.

  4. if the company name is set, return the stored quote or a 404 response if the company is unknown

The response to a request is accessible using request.response()
To write the response use response.end(content).
To create the JSON representation of an object, you can use the Json.encode method
HttpServerResponse response = request.response()    (1)
    .putHeader("content-type", "application/json");
String company = request.getParam("name");          (2)
if (company == null) {
    String content = Json.encodePrettily(quotes);   (3)
    response
        .end(content);                              (4)
 } else {
    JsonObject quote = quotes.get(company);
    if (quote == null) {
      response.setStatusCode(404).end();            (5)
    } else {
      response.end(quote.encodePrettily());
    }
 }
1 Get the response object from the request
2 Gets the name parameter (query parameter)
3 Encode the map to JSON
4 Write the response and flush it using end(…​)
5 If the given name does not match a company, set the status code to 404

You may wonder why synchronization is not required. Indeed we write in the map and read from it without any synchronization constructs. Here is one of the main feature of Vert.x: all this code is going to be executed by the same event loop, so it’s always accessed by the same thread, never concurrently.

The Map<String, JsonObject> could be replaced by a simple JsonObject as they behave as a Map<String, Object>.

6.4. Time to start the quote generator

First, let’s build the microservice fat-jar. In the terminal, execute:

cd quote-generator
mvn package

Then, open a new terminal and launch:

java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar

This command launch the application. The main class we used creates a clustered Vert.x instance and read the configuration from src/conf/config.json. This configuration provides the HTTP port on which the REST service is published (35000).

Let’s now open a browser and have a look to http://localhost:35000.

It should return something like:

{
  "MacroHard" : {
    "volume" : 100000,
    "shares" : 51351,
    "symbol" : "MCH",
    "name" : "MacroHard",
    "ask" : 655.0,
    "bid" : 666.0,
    "open" : 600.0
  },
  "Black Coat" : {
    "volume" : 90000,
    "shares" : 45889,
    "symbol" : "BCT",
    "name" : "Black Coat",
    "ask" : 654.0,
    "bid" : 641.0,
    "open" : 300.0
  },
  "Divinator" : {
    "volume" : 500000,
    "shares" : 251415,
    "symbol" : "DVN",
    "name" : "Divinator",
    "ask" : 877.0,
    "bid" : 868.0,
    "open" : 800.0
  }
}

It gives the current details of each quotes. The data is updated every 3 seconds, so refresh your browser to get the latest data.

Let’s now launch the dashboard. In another terminal, navigate to $project-home/trader-dashboard and execute:

mvn clean package
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar

Then, open your browser to http://localhost:8080. You should see:

Dashboard

Some parts have no content, and it’s expected as it’s just the beginning…​

6.5. You are not a financial expert ?

So maybe you are not used to the financial world and words…​ Neither am I, and this is a overly simplified version. Let’s define the important fields:

  • name : the company name

  • symbol : short name

  • shares : the number of stock that can be bought

  • open : the stock price when the session opened

  • ask : the price of the stock when you buy them (seller price)

  • bid : the price of the stock when you sell them (buyer price)

You can check Wikipedia for more details. ## Event bus services - Portfolio service

In the quote generator we have seen the basics of Vert.x development:

  • Asynchronous API and AsyncResult

  • Implementing Handler

  • Receiving messages from the event bus

In the portfolio component, we are going to implement an event bus service. A Portfolio stores the owned shares and the available cash.

6.6. RPC and Async RPC

Microservices are not only about REST. They can be exposed using any types of interactions, and Remote Procedure Calls is one of them. With RPC, a component can effectively send a request to another component by doing a local procedure call, which results in the request being packaged in a message and sent to the callee. Likewise, the result is sent back and returned to the caller component as the result of the procedure call:

rpc sequence

Such interactions has the advantages to introduce typing, and so is less error-prone than unstructured messages. However, it also introduces a tighter coupling between the caller and the callee. The caller knows how to call the callee:

  1. how the service is called

  2. where the service is living (location)

Traditional RPC exhibits an annoying drawback: the caller waits until the response has been received. This is definitely a blocking call, as it involves at least two network messages. In addition, it is not really designed for failures while distributed communications have many reasons to fail.

Fortunately, Vert.x proposes a different form of RPC: Async RPC. Async RPC follows the same principles as RPC, but instead of waiting for the response, it passes a Handler<AsyncResult<X> called when the result is received.

async rpc sequence

The AsyncResult notifies the Handler whether the invocation succeeded or failed. Upon success, the handler can retrieve the result.

Such async-RPC has several advantages:

  • the caller is not blocked

  • it deals with failures

  • it avoids you to send messages on the event bus and manages object marshalling and unmarshalling for you.

6.7. Async service interface

To create an async RPC service, or event bus service, or service proxies, you first need a Java interface declaring the async methods. Open the io.vertx.workshop.portfolio.PortfolioService class.

The class is annotated with:

  • ProxyGen - enables the event bus service proxy and server generation

  • VertxGen - enables the creation of the proxy in the different language supported by Vert.x

Let’s have a look at the first method:

void getPortfolio(Handler<AsyncResult<Portfolio>> resultHandler);

This method lets you retrieve a Portfolio object. As explained above the method is asynchronous and so has a Handler parameter receiving an AsyncResult<Portfolio>. The other methods follows the same pattern.

You may have also noticed that the package as a package-info.java file. This file is required to enable the service proxy generation.

6.8. Data objects

The Portfolio object is a data object. Event bus proxies support a limited set of types, and for non-supported types, it must use data objects (please check the documentation for the whole list of supported types). Data objects are Java classes obeying to a set of constraints:

  • It must be annotated with DataObject

  • It must have an empty constructor, a copy constructor and a constructor taking a JsonObject as parameter

  • It must have a toJson method building a JsonObject representing the current object

  • Fields must be property with (getters and setters)

Let’s open the io.vertx.workshop.portfolio.Portfolio class to see how it looks like. As you can see, all the JSON handling is managed by converters that are automatically generated, so a data object is very close to a simple bean.

6.9. Implementing the service

It’s nice to have an async interface for our service, but it’s time to implement it. We are going to implement three methods in this service:

  • getPortfolio to understand how to create AsyncResult objects

  • sendActionOnTheEventBus to see how to send messages on the event bus

  • evaluate computing the current value of the portfolio

6.9.1. Task - Creating AsyncResult instances

As we have seen above, our async service have Handler<AsyncResult<Portfolio>> parameter. So when we implement this service, we would need to call the Handler with an instance of AsyncResult. To see how this works, let’s implement the getPortfolio method:

In io.vertx.workshop.portfolio.impl.PortfolioServiceImpl, fill the getPortfolio method. It should call the handle method of the resultHandler with a successful async result. This object can be created from the (Vert.x) Future method.

resultHandler.handle(Future.succeededFuture(portfolio));

Wow…​ one single line ? Let’s dissect it:

  • resultHandler.handle : this is to invoke the Handler. Handler<X> has a single method (handle(X)).

  • Future.succeededFuture : this is how we create an instance of AsyncResult denoting a success. The passed value is the result (portfolio)

But, wait, what is the relationship between AsyncResult and Future ? A Future represents the result of an action that may, or may not, have occurred yet. The result may be null if the Future is used to detect the completion of an operation. The operation behind a Future object may succeed or fail. AsyncResult is a structure describing the success of the failure of an operation. So, Future are AsyncResult. In Vert.x AsyncResult instances are created from the Future class.

AsyncResult describes:

  • a success as shown before, it encapsulates the result

  • a failure, it encapsulates a Throwable instance

Did you know that the term Future has been introduced in 1977, Promise in 1976…​ Not really new things.

So, how does this work with our async RPC service, let’s look at this sequence diagram:

portfolio sequence

6.9.2. Task - Sending event on the event bus

In the previous chapter, we have registered a consumer receiving event bus services, it’s time to see how to send messages on the event bus. You access the event bus using vertx.eventBus(). From this object you can:

  • send : send a message in point to point mode

  • publish : broadcast a message to all consumers registered on the address

  • send with a Handler<AsyncResult<Message>>>: send a message in point to point mode and expect a reply

In the last point, notice the AsyncResult<Message>. It’s an async result as the reply may never arrive (and so will be considered as a failure).

Ok, back to our code. We have provided the buy and sell methods, that are just doing some checks before buying or selling shares. Once the action is emitted, we send a message on the event bus that will be consumed by the Audit Service and the Dashboard. So, we are going to use the publish method.

Write the body of the sendActionOnTheEventBus method in oder to publish a message on the EVENT_ADDRESS address containing a JsonObject as body. This object must contains the following entries:

  • action → the action (buy or sell)

  • quote → the quote as Json

  • date → a date (long in milliseconds)

  • amount → the amount

  • owned → the updated (owned) amount

vertx.eventBus().publish(EVENT_ADDRESS, new JsonObject()
    .put("action", action)
    .put("quote", quote)
    .put("date", System.currentTimeMillis())
    .put("amount", amount)
    .put("owned", newAmount));

Let’s have a deeper look:

  1. it gets the EventBus instance and call publish on it. The first parameter is the address on which the message is sent

  2. the body is a JsonObject containing the different information on the action (buy or sell, the quote (another json object), the date…​

6.9.3. Task - Coordinating async methods and consuming HTTP endpoints - Portfolio value evaluation

The last method to implement is the evaluate method. This method computes the current value of the portfolio. However, for this it needs to access the "current" value of the stock (so the last quote). It is going to consume the HTTP endpoint we have implemented in the quote generator. For this, we are going to:

  • discover the service

  • call the service for each company we own some shares

  • when all calls are done, compute the value and send it back to the caller

That’s a bit more tricky, so let’s do it step by step. First, in the evaluate, we need to retrieve the HTTP endpoint (service) provided by the quote generator. This service is named quotes. We published in in the previous section. So, let’s start to get this service.

Fill the evaluate method to retrieve the quotes service. You can retrieve Http services using HttpEndpoint.getClient. The name of the service is quotes. If you can’t retrieve the service, just passed a failed async result to the handler. Otherwise, call computeEvaluation.

HttpEndpoint.getClient(discovery, new JsonObject().put("name", "quotes"), (1)
  client -> {
       if (client.failed()) {                                                     (2)
         // It failed...
         resultHandler.handle(Future.failedFuture(client.cause()));
       } else {
         // We have the client
         HttpClient httpClient = client.result();                                 (3)
         computeEvaluation(httpClient, resultHandler);
       }
 });
1 Get the HTTP Client for the requested service.
2 The client cannot be retrieved (service not found), report the failure
3 We have the client, let’s continue…​

Here is how the computeEvaluation method is implemented:

private void computeEvaluation(HttpClient httpClient, Handler<AsyncResult<Double>> resultHandler) {
    // We need to call the service for each company we own shares
    List<Future> results = portfolio.getShares().entrySet().stream()
        .map(entry -> getValueForCompany(httpClient, entry.getKey(), entry.getValue()))    (1)
        .collect(Collectors.toList());


    // We need to return only when we have all results, for this we create a composite future.
    // The set handler is called when all the futures has been assigned.
    CompositeFuture.all(results).setHandler(                                            (2)
      ar -> {
        double sum = results.stream().mapToDouble(fut -> (double) fut.result()).sum();  (3)
        resultHandler.handle(Future.succeededFuture(sum));                              (4)
    });
}

First, we need to get a list of Future that would receive the different evaluations (one per company) (1). This evaluation is asynchronous (as it involves a HTTP call to get the latest value). We don’t know when these Future will be all valuated (or assigned). Fortunately, Vert.x provides CompositeFuture for this very purpose (2). CompositeFuture.all calls its assigned handler when all the given Futures are assigned. So when the handler is executed, we knows all the futures has received a value, and so we can compute the sum (3). Finally, we send this result to the client by calling the resultHandler (4).

Well, we just need the getValueForCompany method that call the service. Write the content of this method. You would need to create a Future object to report the completion of the operation. This future is the "returned" result of the method. Then, call the HTTP endpoint (/?name= + encode(company)).

The encode(String) method is provided for you.

When the response arrives, check the status (should be 200) and retrieve the body (with bodyHandler). The body can be parsed as a JsonObject using buffer.toJsonObject(). The value you compute is the numberOfShares * the bid price (read from the body). Once the value is computed, complete the future. Don’t forget to report failures to the future too. To simplify, if the company is unknown (meaning the response status code is not 200) we assume the value of the shares to be 0.0.

private Future<Double> getValueForCompany(HttpClient client, String company, int numberOfShares) {
  // Create the future object that will  get the value once the value have been retrieved
  Future<Double> future = Future.future();                                           (1)

  client.get("/?name=" + encode(company), response -> {                              (2)
    response.exceptionHandler(future::fail);                                         (3)
    if (response.statusCode() == 200) {
      response.bodyHandler(buffer -> {
        double v = numberOfShares * buffer.toJsonObject().getDouble("bid");
        future.complete(v);                                                          (4)
      });
    } else {
      future.complete(0.0);                                                          (5)
    }
  })
    .exceptionHandler(future::fail)                                                  (6)
    .end();                                                                          (7)

  return future;
}

First, we create the Future object that will be returned by the method (1). Then, we use the HTTP client to retrieve the last quote of the company with the get method (2). get prepares the request but does not emit it until end is called. The HTTP Client is already configured with the right IP and port (the service discovery manages this). When we get the response, we have to register an exceptionHandler (3) to catch failures that may happen when receiving the response body. Then we can read the body and compute the evaluation. When done, we assigned a value to the Future (4). If the company cannot be found, we evaluate these shares to 0.0 (5).

As the connection to the server may fail, we should also register an exceptionHandler on the client itself (6). Finally, we emit the request using the end method (7).

6.10. Task - Publishing a service

Now that the service implementation is complete, let’s publish it ! First we need a verticle that creates the actual service object, registers the service on the event bus and publishes the service in the service discovery infrastructure.

Open the io.vertx.workshop.portfolio.impl.PortfolioVerticle class. In its start method is does what we just say:

1) Create the service object with:

PortfolioServiceImpl service = new PortfolioServiceImpl(vertx, discovery, config().getDouble("money", 10000.00));

2) Register it on the event bus using the ProxyHelper class:

ProxyHelper.registerService(PortfolioService.class, vertx, service, ADDRESS);

3) Publish the service in the service discovery infrastructure to make it discoverable:

publishEventBusService("portfolio", ADDRESS, PortfolioService.class, ar -> {
  if (ar.failed()) {
    ar.cause().printStackTrace();
  } else {
    System.out.println("Portfolio service published : " + ar.succeeded());
  }
});

The publishEventBusService is implemented as follows:

// Create the service record:
Record record = EventBusService.createRecord(name, address, serviceClass);
// Publish it using the service discovery
discovery.publish(record, ar -> {
  if (ar.succeeded()) {
    registeredRecords.add(record);
    completionHandler.handle(Future.succeededFuture());
  } else {
    completionHandler.handle(Future.failedFuture(ar.cause()));
  }
});

Are we done ? No…​. We have a second service to publish. Remember, we are also sending messages on the event bus when we buy or sell shares. This is also a service (a message source service to be exact).

At the end of the start method, write the code required to publish the portfolio-events service. EVENT_ADDRESS is the event bus address.

there are publish methods available depending of your service type.
publishMessageSource("portfolio-events", EVENT_ADDRESS, ar -> {
  if (ar.failed()) {
    ar.cause().printStackTrace();
  } else {
    System.out.println("Portfolio Events service published : " + ar.succeeded());
  }
});

Now we are done, and it’s time to build and run this service.

6.11. Run time !

To build the project launch:

cd portfolio-service
mvn clean package

Then, launch it, in another terminal with:

java -jar target/portfolio-service-1.0-SNAPSHOT-fat.jar

Here you go, the portfolio service is started. It discovers the quotes service and is ready to be used.

Go back to the dashboard, and you should see some new services and the cash should have been set in the top left corner.

The dashboard is consuming the portfolio service using the async RPC mechanism. A client for JavaScript is generated at compile time, and use SockJS to communicate. Behind the hood there is a bridge between the event bus and SockJS.

Well, it’s time to buy and sell some shares no ? Let’s do that in the next chapter. ## The compulsive traders

In the portfolio project we have implemented an event bus service to manage our portfolio. In the quote-generator we send quotes on the event bus. Traders are components that consumes these two services with only one goal: getting rich (or not…​)!

In this section, we are going to develop 2 traders (following a stupid logic you can definitely improve):

  • the first trader is developed in Java

  • the second trader is developed in Groovy

6.12. Compulsive and dumb traders

Before seeing how these are implemented, let’s explain the absolutely illogic algorithm used by these traders:

  1. A compulsive trader is picking randomly one company name and a number of shares (x)

  2. Randomly, it tries to buy or sell x shares of the company

It does not check whether or not it has enough shares or money, it just tries…​ This logic is implemented in io.vertx.workshop.trader.impl.TraderUtils.

6.13. Deploying several instance and polyglot verticles.

The compulsive-trader project contains a main-verticle (io.vertx.workshop.trader.impl.MainVerticle) that is going to configure and deploy the traders:

@Override
public void start() throws Exception {

    // Java traders
    vertx.deployVerticle(JavaCompulsiveTraderVerticle.class.getName(),
        new DeploymentOptions().setInstances(2));                           (1)

    // Groovy traders...
    vertx.deployVerticle("GroovyCompulsiveTraderVerticle.groovy");          (2)
}

The JavaCompulsiveTraderVerticle is deployed with some DeploymentOptions (1). This one sets the number of instances to 2, so Vert.x is not instantiating this verticle once, with twice (2 different objects). So, the previous code deploys three traders.

The Groovy verticle is deployed by passing the file name. The verticle file is in src/main/resources and will be located at the root of the fat-jar.

It’s now time to implement these verticles

6.14. The Java Trader

Open the io.vertx.workshop.trader.impl.JavaCompulsiveTraderVerticle class. In the place of the TODO we need:

  1. Initialize the trader

  2. Retrieve the 2 services we use

  3. When both have been retrieve, apply the trading logic on every new market data.

6.14.1. Task - 1. Initialize the trader

First, look at the start method signature: start(Future<Void> future). The future let us tell when the initialization has been completed and whether or not it has been successful. That means we have to complete or fail this future explicitly.

To initialize the trader, remove the future.fail statement and initialize the company and numberOfShares variables by using the TraderUtils class.

String company = TraderUtils.pickACompany();
int numberOfShares = TraderUtils.pickANumber();
System.out.println("Java compulsive trader configured for company " + company + " and shares: " + numberOfShares);

6.14.2. Task - 2. Retrieving several services

The trader needs the Portfolio service and the market service (the message source sending the market data). We cannot start the trading logic before having retrieved both of them. Let’s use the Future composition feature we have seen in the previous section.

Write the code (just after the previous code you wrote) required to retrieve the 2 services. When both are retrieved (use the all composition operator), just do nothing. The handler will be filled in the next step.

// We need to retrieve two services, create two futures object that
// will get the services
Future<MessageConsumer<JsonObject>> marketFuture = Future.future();             (1)
Future<PortfolioService> portfolioFuture = Future.future();

// Retrieve the services, use the "special" completed to assign the future
MessageSource.getConsumer(discovery, new JsonObject().put("name", "market-data"),
    marketFuture.completer());                                                  (2)
EventBusService.getProxy(discovery, PortfolioService.class,
    portfolioFuture.completer());                                               (3)

// When done (both services retrieved), execute the handler
CompositeFuture.all(marketFuture, portfolioFuture).setHandler(ar -> {           (4)
   // Next....
});

First create the 2 Future objects that will receive the two services once retrieved (1). In (2) we retrieve the message source service and use a special Handler to assign the Future. The completer is basically a Handler receiving the result and setting the value in the Future or mark it as failed. The same approach is used in (3) to retrieve the Portfolio service.

Finally, in <4>, we create a CompositeFuture that execute the attached Handler when all the listed Futures has been assigned.

6.14.3. Task - 3. Applying the trading logic

Almost done! Now write the last handler. If the retrieval as failed, just report the failure on future. Otherwise, registers a message consumer on the market service and every time you get a message, apply the TraderUtils.dumbTradingLogic method. Then complete future.

if (ar.failed()) {                                                               (1)
    future.fail("One of the required service cannot " +
            "be retrieved: " + ar.cause());
} else {
    // Our services:                                                             (2)
    PortfolioService portfolio = portfolioFuture.result();
    MessageConsumer<JsonObject> marketConsumer = marketFuture.result();

    // Listen the market...
    marketConsumer.handler(message -> {                                          (3)
        JsonObject quote = message.body();
        TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, quote); (4)
    });

    future.complete();                                                           (5)
}

First, we need to check if the service retrieval has been successful. This is made in (1). If so, we can unwrap the services from the Future objects (2). We set a Handler on the marketConsumer (the message source service) executing the trading logic (4). Finally, in (5), we complete the future given as parameter of the start method to notify of the completion of the initialization process. The complete method does not pass a result as we just notify of the completion. Notice also the future.fail to denote an initialization issue.

6.14.4. The code in one snippet

Here is the whole code.

    super.start();

    String company = TraderUtils.pickACompany();
    int numberOfShares = TraderUtils.pickANumber();
    System.out.println("Java compulsive trader configured for company " + company + " and shares: " + numberOfShares);

    // We need to retrieve two services, create two futures object that will get the services
    Future<MessageConsumer<JsonObject>> marketFuture = Future.future();
    Future<PortfolioService> portfolioFuture = Future.future();
    // Retrieve the services, use the "special" completed to assign the future
    MessageSource.getConsumer(discovery, new JsonObject().put("name", "market-data"), marketFuture.completer());
    EventBusService.getProxy(discovery, PortfolioService.class, portfolioFuture.completer());

    // When done (both services retrieved), execute the handler
    CompositeFuture.all(marketFuture, portfolioFuture).setHandler(ar -> {
      if (ar.failed()) {
        future.fail("One of the required service cannot " +
            "be retrieved: " + ar.cause());
      } else {
        // Our services:
        PortfolioService portfolio = portfolioFuture.result();
        MessageConsumer<JsonObject> marketConsumer = marketFuture.result();

        // Listen the market...
        marketConsumer.handler(message -> {
          JsonObject quote = message.body();
          TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, quote);
        });

        future.complete();
      }
    });

6.14.5. Run this trader

We can already run this trader and see if it makes educated actions on the market. Package it using:

cd compulsive-traders
mvn clean package

Then launch the application with:

java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar

If you go back to the dashboard, you may start seen some moves on your portfolio.

6.15. Task - Writing a Groovy Verticle

The Groovy trader is using the same trading logic, but this verticle is going to be developed in Groovy. To ease the understanding, the code is going to be very close to the Java version.

Open src/main/resources/GroovyCompulsiveTraderVerticle.groovy. This verticle is going to be a Groovy Script. So the content is the start method of the verticle. Vert.x also supports Groovy classes.

If you don’t know Groovy, just copy and paste the solution. If you do, you can try to implement the trader by yourself following the same logic as the Java trader:

  • Groovy version of the Vert.x API are in io.verx.groovy.x.y, for instance io.vertx.groovy.core.CompositeFuture.

  • Json Objects are Groovy Maps, so a MessageConsumer<JsonObject> in Java is a MessageConsumer<Map> in Groovy.

import io.vertx.groovy.core.CompositeFuture
import io.vertx.groovy.core.Future
import io.vertx.groovy.core.eventbus.MessageConsumer
import io.vertx.groovy.servicediscovery.types.EventBusService;
import io.vertx.groovy.servicediscovery.types.MessageSource;
import io.vertx.groovy.servicediscovery.ServiceDiscovery
import io.vertx.workshop.portfolio.PortfolioService
import io.vertx.workshop.trader.impl.TraderUtils

def company = TraderUtils.pickACompany();
def numberOfShares = TraderUtils.pickANumber();

println("Groovy compulsive trader configured for company " + company + " and shares: " + numberOfShares);

// We create the discovery service object.
def discovery = ServiceDiscovery.create(vertx);

Future<MessageConsumer<Map>> marketFuture = Future.future();
Future<PortfolioService> portfolioFuture = Future.future();

MessageSource.getConsumer(discovery,
        ["name" : "market-data"], marketFuture.completer());
EventBusService.getProxy(discovery,
        "io.vertx.workshop.portfolio.PortfolioService", portfolioFuture.completer());

// When done (both services retrieved), execute the handler
CompositeFuture.all(marketFuture, portfolioFuture).setHandler( { ar ->
  if (ar.failed()) {
    System.err.println("One of the required service cannot be retrieved: " + ar.cause());
  } else {
    // Our services:
    PortfolioService portfolio = portfolioFuture.result();
    MessageConsumer<Map> marketConsumer = marketFuture.result();

    // Listen the market...
    marketConsumer.handler( { message ->
      Map quote = message.body();
      TraderUtils.dumbTradingLogic(company, numberOfShares, portfolio, quote);
    });
  }
});

As you can see the code if very close to the Java one. Let’s spot the differences:

  • Well, it’s Groovy. When an interface is annotated with @VertxGen, Vert.x translates it to all the supported (and configured) languages

  • On the import statements, you can see it does import groovy version of the packages.. Each translation tries to be as close as possible to the language idioms, that’s why you have Maps in Groovy

  • We have to create the service discovery (as before it was made by a parent class)instead of JsonObject.

  • JsonObjects are Maps. In Groovy, it makes more sense to use Map objects.

This example has been developed in Groovy, but it would be similar in JavaScript, Ruby or Ceylon.

It’s time to rebuild and restart our traders. Hit CTRL+C to stop the running trader. Then, rebuild with:

mvn clean package

And launch the application with:

java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar

If you go back to the dashboard, you may start seen some moves on your portfolio. Now 3 traders are trying to make you (virtually) rich.

7. The Audit service

The law is the law…​ The Sarbanes–Oxley Act requires you to keep a track of every transaction you do on a financial market. The audit service records the shares you buy and sell in a database. It’s going to be a HSQL database, but is would be similar with another database, even no-sql database.

7.1. Accessing data asynchronously

As said previously, Vert.x is asynchronous and you must never block the event loop. And you know what’s definitely blocking ? Database accesses and more particularly JDBC! Fortunately, Vert.x provides a JDBC client that is asynchronous.

The principle is simple (and is applied to all clients accessing blocking systems):

database sequence
Worker ? Yes, Vert.x has the notion of workers (a separated thread pool) to execute blocking code. It can be a verticle marked as worker or with the vertx.executeBlocking construct. However, even if possible, you should not abuse from these features as it reduces the scalability of the application.

However, interactions with databases are rarely a single operation, but a composition of operations. For example:

  1. Get a connection

  2. Drop some tables

  3. Create some tables

  4. Close the connection

So, we need a way to compose these operations, and report failures when required. This is what we are going to see in the Audit component.

7.2. The Audit service

The Audit service:

  1. Listens for the financial operations on the event bus

  2. Stores the received operations in a database

  3. Exposes a REST API to get the last 10 operations

Interactions with the database use the vertx-jdbc-client, an async version of JDBC. So expect to see some SQL code (I know you love it).

7.3. Task - Composing methods returning Future.

Open the io.vertx.workshop.audit.impl.AuditVerticle class. The first important detail of this verticle is its start method. As the start method from the Java trader, the method is asynchronous, and report its completion in the given Future object:

public void start(Future<Void> future) {
     // creates the jdbc client.
     jdbc = JDBCClient.createNonShared(vertx, config());

     // TODO
     // ----
     future.fail("not implemented yet");
     // ----
}

Vert.x would consider the verticle deploy when the Future is valuated. It may also report a failure if the verticle cannot be started correctly.

Initializing the audit service includes:

  • prepare the database (table)

  • start the HTTP service and expose the REST API. In addition publish this endpoint as a service

  • retrieve the message source on which the operation are sent

So, it’s clearly 3 independent actions, but the audit service is started only when all of them has been completed.

Replace the TODO block with some code. This code should retrieves 3 future objects (from methods provided in the class) and wait for the completion of the three tasks. When there, register a Handler on the portfolio message source calling storeInDatabase for each received message. Don’t forget to complete or fail future.

Future<MessageConsumer<JsonObject>> messageListenerReady = retrieveThePortfolioMessageSource();
Future<Void> databaseReady = initializeDatabase(config().getBoolean("drop", false));
Future<Void> httpEndpointReady = configureTheHTTPServer().compose(
        server -> {
          Future<Void> regFuture = Future.future();
          publishHttpEndpoint("audit", "localhost", server.actualPort(), regFuture.completer());
          return regFuture;
        }
    );

CompositeFuture.all(httpEndpointReady, databaseReady, messageListenerReady)
    .setHandler(ar -> {
      if (ar.succeeded()) {
        // Register the handle called on messages
        messageListenerReady.result().handler(message -> storeInDatabase(message.body()));
        // Notify the completion
        future.complete();
      } else {
        future.fail(ar.cause());
      }
    });

First we create 3 Futures, one per action to execute. We are going to see how they are created in a minute. Then we compose all of them using the CompositeFuture.all operator. When all the actions are completed, it calls the given handler that registers the message listener storing the operation in the database.

To notify Vert.x that the start process is completed (or successfully or not), it calls future.complete() and future.fail(cause).

7.4. Task - Implementing a method returning a Future

We have mentioned that async method have a signature with a Handler as last parameter. There is an equivalent syntax that returns a Future object when the operations they are executing are completed:

void asyncMethod(a, b, Handler<R> handler);
// is equivalent to
Future<R> asyncMethod(a, b);

Indeed, the caller can attach a Handler on the returned Future object to be notified when the operation has completed:

Future<R> future = asyncMethod(a, b);
future.setHandler(ar -> {
  if (ar.failed()) { /* the operaiton has failed */ }
  else {
    // Do something with the result
  }
});

Let’s implement the configureTheHTTPServer method following this pattern. In this method we are going to use a new Vert.x Component: Vert.x Web. Vert.x Web is a Vert.x extension to build modern web application. Here we are going to use a Router which let us implement REST APIs easily (à la Hapi or ExpressJS). So:

  1. Create the future object your are going to return

  2. Create a Router object with: Router.router(vertx)

  3. Register a route (on /) on the router, calling retrieveOperations

  4. Create a HTTP server on the port passed in the configuration or 0 if not set (it picks an available port), and delegating the request handler to router.accept. We can pick a random port as it is exposed in the service record, so consumer are bound to the right port.

  5. When the server has started completes the future

private Future<HttpServer> configureTheHTTPServer() {
    Future<HttpServer> future = Future.future();

    // Use a Vert.x Web router for this REST API.
    Router router = Router.router(vertx);
    router.get("/").handler(this::retrieveOperations);

    vertx.createHttpServer()
        .requestHandler(router::accept)
        .listen(config().getInteger("http.port", 0), future.completer());

    return future;
}

It creates a Router. The Router is an object from Vert.x web that ease the creation of REST API with Vert.x. We won’t go into too much details here, but if you want to implement REST API with Vert.x, this is the way to go. On our Router we declare a route: when a request arrive on /, it calls this Handler. Then, we create the HTTP server. The requestHandler is a specific method of the router, and in the listen handler we pass future.completer(). This completer is a sugar roughly doing:

if (ar.failed()) { future.fail(ar.cause()); }
else { future.complete(ar.result()); }

So, the caller can call this method and get a Future. It can register a Handler on it that is called when the Future receives a value (or a failure).

If you look at the retrieveThePortfolioMessageSource, you would see the very same pattern.

7.5. Using Async JDBC

In the start method, we are calling initializeDatabase. Let’s look at this method using another type of action composition. This method:

  • get a connection to the database

  • drop the table

  • create the table

  • close the connection (whatever the result of the two last operations)

All these operations may fail.

In the last paragraph we have seen methods returning Future. Chains are a composition of such functions:

  1. you have an input

  2. you execute a first Function taking the input from (1) and returning a Future

  3. you execute a second Function taking the input from (2) and returning a Future

  4. …​.

The completion of a chain is a Future object. If one of the chained operation fails, this Future is marked as failed, otherwise it is completed with the result of the last operation:

Future<X> chain = Chain.chain(input, function1, function2, function3);

A chain can be triggered by a Future too, so is started upon the successful completion of the given Future.

So to use the composition pattern, we just need a set of Functions and a Future that would trigger the chain. Let’s create this Future first:

// This future will be assigned when the connection with the database is established.
// We are going to use this future as a reference on the connection to close it.
Future<SQLConnection> connectionRetrieved = Future.future();
// Retrieve a connection with the database, report on the databaseReady if failed, or
// assign the connectionRetrieved future.
jdbc.getConnection(connectionRetrieved.completer());

Then, we need two functions taking a SQLConnection as parameter

  1. The first function drops the table if needed

  2. The second function creates the table

// When the connection is retrieved, we want to drop the table (if drop is set to true)
Function<SQLConnection, Future<SQLConnection>> dropTable = connection -> {
  Future<SQLConnection> future = Future.future();
  if (!drop) {
    future.complete(connection); // Immediate completion.
  } else {
    connection.execute(DROP_STATEMENT, completer(future, connection));
  }
  return future;
};

// When the table is dropped, we recreate it
Function<SQLConnection, Future<Void>> createTable = connection -> {
  Future<Void> future = Future.future();
  connection.execute(CREATE_TABLE_STATEMENT, future.completer());
  return future;
};

As explained above, both function returns a Future object. The result of the first function is used as input of the second function.

It’s now time to build the chain:

// Ok, now it's time to chain all these actions:
// connectionRetrieved -> dropTable -> createTable -> in all case
// close the connection

Chain.chain(connectionRetrieved, dropTable, createTable)       (1)
    .setHandler(ar -> {                                        (2)
      // Whatever the result, if the connection has been
      // retrieved, close it
      if (connectionRetrieved.result() != null) {               (3)
        connectionRetrieved.result().close();
      }

      // Complete the main future with the result.
      databaseReady.completer().handle(ar);                     (4)
    });

In (1) we create the chain. The first parameter is the Future triggering the chain. The two other parameters are the chained functions. We attach a Handler to the returned Future that is executed when the chain has been executed with the result (2). In (3), we check whether we have a connection and close it. Finally, in (4) we complete the main Future (returned by the method).

7.6. Task - Async JDBC with a callback-based composition

You may ask why we do such kind of composition. Let’s implement a method without any composition operator (just using callbacks). The retrieveOperations method is called when a HTTP request arrives and should return a JSON object containing the last 10 operations. So, in other words:

  1. Get a connection to the database

  2. Query the database

  3. Iterate over the result to get the list

  4. Write the list in the HTTP response

  5. Close the database

The step (1) and (2) are asynchronous. (5) is asynchronous too, but we don’t have to wait for the completion. In this code, don’t use composition (that’s the purpose of this exercise). In retrieveOperations, write the required code using Handlers / Callbacks.

// 1 - we retrieve the connection
jdbc.getConnection(ar -> {
  SQLConnection connection = ar.result();
  if (ar.failed()) {
    context.fail(ar.cause());
  } else {
    // 2. we execute the query
    connection.query(SELECT_STATEMENT, result -> {
      ResultSet set = result.result();

      // 3. Build the list of operations
      List<JsonObject> operations = set.getRows().stream()
          .map(json -> new JsonObject(json.getString("OPERATION")))
          .collect(Collectors.toList());

      // 4. Send the list to the response
      context.response().setStatusCode(200).end(Json.encodePrettily(operations));

      // 5. Close the connection
      connection.close();
    });
  }
});

So obviously it’s possible too not use composition. But imagine when you have several asynchronous operation to chain, it become a callback hell very quickly. So, as a recommendation: use the Vert.x composition operators.

All the composition operators (all, compose, any, chain…​) are implemented on top of callbacks. The pure async programming is using callbacks, Future and composition are there to provide a direct correspondence between synchronous functions and asynchronous operations and so ease the implementation of complex processes.

7.7. Show time !

Let’s see how this works.

First you need to built it:

cd audit-service
mvn clean package

Then, you need to launch the application:

 java -jar target/audit-service-1.0-SNAPSHOT-fat.jar

Restart and refresh the dashboard, and you should see the operations in the top right corner!

8. Anatomy of the dashboard

This section is about the dashboard. It covers:

  1. How to configure configure Vert.x web to expose static resources

  2. How to implement a REST endpoint delegating to another REST endpoint (proxy pattern)

  3. How to protect microservices interaction against failures (exception handler, timeout, circuit breaker)

  4. How to configure the SockJS - Eventbus bridge

  5. How can you consume an event bus proxy from the browser

The dashboard is a single verticle (io.vertx.workshop.dashboard.DashboardVerticle).

8.1. Vert.x Web and static files

As mentioned in the previous section, Vert.x web is a Vert.x component to build web application. Its whole architecture is centered on the Router object. You create this router and configure the routes. For each route you configure the HTTP verb and the path and associate the Handler that is called when a matching request is received. The router is creates a follows:

Router router = Router.router(vertx);

Vert.x web provides a set of Handler for common tasks such as serving static files:

// Static content
router.route("/*").handler(StaticHandler.create());

It serves all files from the webroot directory (default) or the server root. For example, webroot/index.html is served using the http://0.0.0.0:8080/index.html url.

Once the router is configured, you need a HTTP server and use the router to handle the requests:

vertx.createHttpServer()
    .requestHandler(router::accept)
    .listen(8080);

8.2. Delegating REST calls

It’s often required to implement a REST API on top of another one. This pattern can be very costly on traditional architecture as each call would block a thread until the call to this second REST API has completed. With Vert.x delegation is easy, asynchronous and non blocking.

For example in the dashboard, we want to retrieve the list of operations. This list is offered by the audit service. So in the dashboard we have this route definition:

router.get("/operations").handler(this::callAuditService);

And the handler is:

private void callAuditService(RoutingContext context) {
    if (client == null) {
      context.response()
          .putHeader("content-type", "application/json")
          .setStatusCode(200)
          .end(new JsonObject().put("message", "No audit service").encode());
    } else {
      client.get("/", response -> {
        response
            .bodyHandler(buffer -> {
              context.response()
                  .putHeader("content-type", "application/json")
                  .setStatusCode(200)
                  .end(buffer);
            });
      })
          .end();
    }
  }

The audit service is retrieved in the verticle start method. If the service was not found, it returns a no audit service message. Otherwise, it uses the retrieved HTTP client to call the audit REST API. The response is simply sent back to the HTTP response.

8.3. Task - Managing failures with exception handler and timeout

The callAuditService method we have seen in the previous section does the job…​ but well, what we do know about distributed systems: they fail. It would be better to manage failures in this method. Vert.x proposes four ways to handle failures:

  • failed asynchronous results and future

  • exception handlers

  • timeout

  • circuit breaker

We have already covered the first point. In this section we cover the point 2 and 3. The next section is about circuit breakers.

Exception handlers are handlers receiving a Throwable parameter and are responsible for managing the reported error. It is used on object that does not called a Handler<AsyncResult<?>> such as on a HTTP request. During a HTTP request-response interaction, failures can happen at several places:

  • a connection issue - the server does not reply or cannot be found.

  • a response issue - the server is there, but the connection is lost while retrieving the content

  • a server issue - the server returns a failure

This last point is application specific and need to be handle by your code. The first two points are managed by exception handlers on the 1) HTTP request, 2) HTTP response objects. Vert.x uses two different handlers because the root issue is different.

Copy this method into another one called callAuditServiceWithExceptionHandler, and set the exceptionHandler on the HTTP request and HTTP response. Both should call the fail method on the RoutingContext parameter. In addition, on the request, configure the response timeout. The exception handler set on the request is called if the response cannot be retrieved before this timeout. Don’t forget to update the handler of the /operations route to call this new method.

private void failureAwarelastOperations(RoutingContext context) {
  if (client == null) {
    context.response()
        .putHeader("content-type", "application/json")
        .setStatusCode(200)
        .end(new JsonObject().put("message", "No audit service")
            .encode());
  } else {
    client.get("/", response -> {
      response
          .exceptionHandler(context::fail)                      (1)
          .bodyHandler(buffer -> {
            context.response()
                .putHeader("content-type", "application/json")
                .setStatusCode(200)
                .end(buffer);
          });
    })
        .exceptionHandler(context::fail)                        (2)
        .setTimeout(5000)                                       (3)
        .end();
  }
 });
}
1 Exception handler called when reading the response fails
2 Exception handler called when the connection with the server cannot be established
3 Timeout configuration

Once done, build the dashboard with:

cd trader-dashboard
mvn clean package

Then, launch it, in another terminal with:

java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar

Refresh the dashboard page. In the audit service terminal, stop the service and check how is reacting the dashboard (you can look at the AJAX request in the inspector/dev tools). Then, relaunch the audit service. What is happening ?

8.4. Task - Managing failures with a circuit breaker

Circuit breaker is a reliability pattern that can be represented with a simple state machine:

Circuit Breaker States

This pattern is very popular in microservice based applications, because it recovers (if possible) from failures smoothly. The circuit breaker starts in the close state. The circuit breaker monitors an operation. Every time this operation fails, it increases a failure counter. When a threshold is reached, it goes to the open state. In this state, the required service is not called anymore, but a fallback is executed immediately. After some time, the circuit breaker goes into the half-open state. In this state, the operation is called for the first request. Other request are redirected to the fallback. If the operation fails, the circuit breaker goes back to the open state until the next attempt. If it succeed it goes back to the close state.

There are lots of implementations of circuit breakers, Netflix Hystrix being the most popular. Vert.x provides its own implementation. Indeed, using Hystrix can be a bit cumbersome (but possible) as it does not enforce the Vert.x threading model.

In the DashboardVerticle.java file, a circuit breaker (called circuit) is initialized in the start method:

circuit = CircuitBreaker.create(
    "http-audit-service",                        (1)
    vertx,
    new CircuitBreakerOptions()
        .setMaxFailures(2)                       (2)
        .setFallbackOnFailure(true)              (3)
        .setResetTimeout(2000)                   (4)
        .setTimeout(1000))                       (5)
    .openHandler(v -> retrieveAuditService());   (6)
1 the circuit breaker name
2 the number of failure before switching to the open state
3 whether or not the fallback should be called when a failure is detected, even in the close state
4 the time to wait in the open state before switching to the half-open state
5 the time before considering an operation as failed, if it didn’t complete
6 a handler called when the circuit breaker switches to the open state. We try to retrieve the audit service.

With this circuit breaker, write a callAuditServiceWithExceptionHandlerWithCircuitBreaker method managing the arrival and the departure of the audit service. For this, use the circuit.<Buffer>executeWithFallback method. Don’t forget to update the handler of the /operations route.

Once done, rebuild and restart the dashboard. Stop the audit service and see how it behaves. Restart it. You can see on the dashboard page the state of the circuit breaker.

private void callAuditServiceWithExceptionHandlerWithCircuitBreaker(RoutingContext context) {
    HttpServerResponse resp = context.response()
        .putHeader("content-type", "application/json")
        .setStatusCode(200);

    circuit.executeWithFallback(
        future ->
            client.get("/", response -> {
              response
                  .exceptionHandler(future::fail)
                  .bodyHandler(future::complete);
            })
                .exceptionHandler(future::fail)
                .setTimeout(5000)
                .end(),
        t -> Buffer.buffer("{\"message\":\"No audit service, or unable to call it\"}")
    )
        .setHandler(ar -> resp.end(ar.result()));
    }

In comparision to the previous solution, we report failures to the given future and not on the context. When a failure is reported, the fallback is called, returning a default buffer. We don’t have to check whether or not client is null as it throws an exception that fails the operation (and the fallback is called).

8.5. SockJS - Event bus bridge

SockJS is a browser JavaScript library that provides a WebSocket-like object. SockJS gives you a coherent, cross-browser, Javascript API which creates a low latency, full duplex, cross-domain communication channel between the browser and the web server. Under the hood SockJS tries to use native WebSockets first. If that fails it can use a variety of browser-specific transport protocols and presents them through WebSocket-like abstractions. SockJS-client does require a server counterpart to handle the communication. And you know what, Vert.x implements it !

With the SockJS - Event bus bridge, it lets the browser send and receive messages from the event bus.

To enable the bridge you need the following code:

SockJSHandler sockJSHandler = SockJSHandler.create(vertx);                      (1)
BridgeOptions options = new BridgeOptions();
options
    .addOutboundPermitted(new PermittedOptions().setAddress("market"))         (2)
    .addOutboundPermitted(new PermittedOptions().setAddress("portfolio"))
    .addOutboundPermitted(new PermittedOptions().setAddress("service.portfolio"))
    .addInboundPermitted(new PermittedOptions().setAddress("service.portfolio"));

sockJSHandler.bridge(options);                                                 (3)
router.route("/eventbus/*").handler(sockJSHandler);                            (4)

In (1), we create the SockJSHandler. It needs to be configured, as by default, for security reasons, no messages are transmitted. A set of permitted addresses configures bridge (2). Outbound addresses are for messages from the event bus to the browser, while inbound addresses are for messages from the browser to the event bus. Finally in (3) and (4), it configures the handler and create a router in the router. The /eventbus/* path is used by the SockJS client (in the browser) to negotiate the connection, receive and send the messages.

This is not the only bridge that exists for the event bus. There is also a TCP event bus bridge for native systems. Notice also, that the SockJS bridge can also be used from Node.JS.

8.6. Consuming event bus service from the browser

As said above, there is a bridge between sockJS and the event bus to let the browser sends and receives messages. As event bus services communicate using event bus messages, it is possible to implement a service client in the browser. Vert.x generates this client for you.

So, if you open the index.html file, you can see:

<script src="libs/portfolio_service-proxy.js"></script>

This imports a script generated by Vert.x (in the portfolio project). Then we can use the service as follows:

var service = new PortfolioService(eventbus, "service.portfolio");
service.getPortfolio(function (err, res) {
   // ....
}

Yes, you can call the service method directly from your browser. ## Conclusion

You made it ! Or you jumped to this section. Anyway, congratulations. We hope you enjoy this lab and learn some stuff. There is many other things Vert.x can do and that was not illustrated here.

Don’t forget that reactive systems and so Vert.x requires a mind-shift:

  • Vert.x is a toolkit to build reactive systems

  • Asynchronous, non-blocking development model can be hard to understand at the first glance, but it becomes very convenient very quickly. Don’t also forget, computers are asynchronous, so using such development model is using it the right way to use it to its whole power.

If you want, and I hope so, to go further here are some references:

9. References

Some recommended reading. Nothing especially about microservices or Vert.x because the concepts are broader than these two topics.

Appendix A - Start, Stop and List commands

Even if not required, Vert.x provides a convenient Launcher class. This class is just the entry point of the application. It initializes Vert.x from the given parameter, deploys the given verticle and so on.

First, this launcher is extensible. For instance, in this lab, we have our own launcher extending the Vert.x one (io .vertx.workshop.common.Launcher). In addition, this launcher is actually composed by an extensible set of command. `run is the default command and runs the given verticle. By default some other commands are available:

    bare      Creates a bare instance of vert.x.
    list      List vert.x applications
    run       Runs a verticle called <main-verticle> in its own instance of
              vert.x.
    start     Start a vert.x application in background
    stop      Stop a vert.x application
    test      Runs a Vert.x Unit test called <test-verticle> in its own instance
              of vert.x.

You can also add your own command.

In this section we are going to focus on the start, stop and list command.

When we have launched our microservices, they were blocking your shell (they were not launched in background). The start command lets you start a Vert.x application in background. For the quote-generator, you can launch it as follows:

java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar start -id quote-generator --redirect-output

The command name follows the fat jar name. It gets an id parameter giving a name to your application. By default, it generates a UUID. The --redirect-output flag instructs Vert.x to redirect the output to the terminal.

Now, to list the Vert.x application running, you launch:

java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar list

Listing vert.x applications...
quote-generator target/quote-generator-1.0-SNAPSHOT-fat.jar

Finally to stop the application, just launch:

java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop quote-generator

So, with this, you can easily write a few commands launching all the microservices:

# Skip this instruction to use your version
cd solution

cd quote-generator
mvn clean package
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar start -id quote-generator --redirect-output
cd ..

cd portfolio-service
mvn clean package
java -jar target/portfolio-service-1.0-SNAPSHOT-fat.jar start -id portfolio-service --redirect-output
cd ..

cd compulsive-traders
mvn clean package
java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar start -id compulsive-traders --redirect-output
cd ..

cd audit-service
mvn clean package
java -jar target/audit-service-1.0-SNAPSHOT-fat.jar start -id audit-service --redirect-output
cd ..

cd trader-dashboard
mvn clean package
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar start -id trader-dashboard --redirect-output
cd ..

Once everything is launched, you can use the list command to check what’s running:

java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar list
Listing vert.x applications...
quote-generator target/quote-generator-1.0-SNAPSHOT-fat.jar
portfolio-service       target/portfolio-service-1.0-SNAPSHOT-fat.jar
compulsive-traders      target/compulsive-traders-1.0-SNAPSHOT-fat.jar
audit-service   target/audit-service-1.0-SNAPSHOT-fat.jar
trader-dashboard        target/trader-dashboard-1.0-SNAPSHOT-fat.jar
you can use any of the fat jar to launch the list and stop commands.

We can now have the opposite script stopping everything:

cd quote-generator
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop trader-dashboard
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop quote-generator
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop audit-service
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop compulsive-traders
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop portfolio-service

The stop command send a signal to the process to ask for the termination. However, this may take time to happen (the termination). So, the process are not stopped immediately. You can check this with the list command.


1. Reactive systems and reactive programming are two different things. Reactive programming is a development model observing and manipulating data streams, while reactive systems are systems that reacts to requests, failures, load peaks and interacts using async messages.
2. Asynchronous: the caller does not wait for a returned response, but pass a callback which is executed when the result has been computed
3. Non-blocking: the code must not block the executing thread - so it must avoid blocking IO, long processing time etc.
4. This is the definition of architecture styles from the Software Engineering Institute.
5. Ability to move quickly and easily, not related to the Agile methodologies