1. Preface
Let’s start from the beginning…. Vert.x. What’s Vert.x? That’s a pretty good question, and probably a good start. If you go on the Vert.x web site, Vert.x is defined as "a toolkit for building reactive applications on the JVM". This description is rather unclear, right? What’s a toolkit? What are reactive applications? In this lab, we are going to explain these words, and build an application using Vert.x illustrating what Vert.x is. This application is going to be composed of microservices. Another buzzword that is currently trending, right ? Actually, Vert.x was already promoting microservices before anyone else.
The developed application is going to be:
-
based on Vert.x (that’s why you are here, right?)
-
distributed
-
built as a reactive system
-
(a bit) fun
This lab offers attendees an intro-level, hands-on session with Vert.x, from the first line of code, to making services, to consuming them and finally to assembling everything in a consistent reactive system. It illustrates what reactive systems are, what reactive programming is, and how to build applications based on reactive microservices (and the s is important).
This is a BYOL (Bring Your Own Laptop) session, so bring your Windows, OSX, or Linux laptop. You need JDK 8+ on your machine, and Apache Maven (3.3+).
What you are going to learn:
-
What Vert.x is and how to use its asynchronous non-blocking development model
-
How to develop microservices with Vert.x with several types of services, and service discovery
-
What
verticlesare and how to use them -
How to use the Vert.x event bus to send and receive messages
-
How to expose HTTP endpoints with Vert.x, and also how to consume them
-
How to compose asynchronous actions
-
How to use several languages in the same application
-
How to use databases with Vert.x
-
How to manage failures with async results, futures, exception handlers and circuit breakers
-
How to use Vert.x with RxJava
And many more…
2. Vert.x
We will attempt to explain Vert.x in just a few lines. Remember that we said in the previous section that Vert.x is "a toolkit for building reactive applications on the JVM".
There are a three important points in this description: toolkit, reactive and "on the JVM".
Firstly, Vert.x is a toolkit. Meaning, Vert.x is not an application server, a container nor a framework.
It’s not a JavaScript library either. Vert.x is a plain old jar file, so a Vert.x application is an application that uses this jar
file. Vert.x does not define a packaging model, all Vert.x components are plain boring
jar files. How does this impact you and your application? Let’s imagine you are using a build tool such as
Maven or Gradle, to make your application a Vert.x application just add the vertx-core dependency. Wanna use another
Vert.x components, just add it as a dependency. It’s simple, burden-less. Starting the application is a simple class
with the public static void main(String[] args) entry point. No specific IDE or plugin to install to start using Vert.x.
Therefore, to use the awesomeness provided by Vert.x, you just need to use it in your code, but be patient, this will be covered later.
Secondly, Vert.x is reactive. It is specifically made to build reactive applications, or more appropriately, systems. Reactive systems [1] has been defined in the Reactive Manifesto. Although, it’s not long document to read, we will further reduce it to these 4 bullet points:
-
Responsive: a reactive system needs to handle requests in a reasonable time (I let you define reasonable).
-
Resilient: a reactive system must stay responsive in the face of failures (crash, timeout,
500errors…), so it must be designed for failures and deal with them appropriately. -
Elastic: a reactive system must stay responsive under various loads. As a consequence, it must scale up and down, and be able to handle the load with minimal resources.
-
Message driven: components from a reactive system interacts using asynchronous message-passing.
Also, Vert.x is event-driven and also non-blocking. Events are delivered in an
event loop that must never be blocked. Let’s explain why. Unlike traditional, let’s say "enterprise", systems,
Vert.x uses a very small number of threads. Some of these threads are event loops, they are responsible for
dispatching the events in Handlers. If you block this thread, the events won’t be delivered anymore. This
execution model impacts how you write your code, instead of the traditional mofrl of blocking code, your code is
going to be asynchronous [2] and non-blocking [3].
As an example, if we wanted to retrieve a resoure from a URL, we would do something like this:
URL site = new URL("http://vertx.io/");
BufferedReader in = new BufferedReader(new InputStreamReader(site.openStream()));
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
}
in.close();
But with Vert.x we are more likely to do:
vertx.createHttpClient().getNow(80, "vertx.io", "", response -> {
response.bodyHandler(System.out::println);
});
The main differences between these 2 codes are:
-
the first one is synchronous and potentially blocking : the instructions are executed in order, and may block the thread for a long time (because the web site may be slow or whatever).
-
the Vert.x one is asynchronous and non-blocking: the thread (event loop) is released while the connection with the HTTP server is established and so can do something else. When the response has been received, the same event loop calls the callback. Most of the Vert.x components are single-threaded (accessed only by a single thread), so no concurrency burden anymore. By the way, with Vert.x, even the DNS resolution is asynchronous and non-blocking (while Java DNS resolution is blocking).
Finally, Vert.x applications runs "on the JVM" the Java Virtual Machine (8+). This means Vert.x applications can be developed using any language that runs on the JVM. Including Java(of course), Groovy, Ceylon, Ruby, JavaScript, Kotlin and Scala. We can even mix and match any combination of all these languages. The polyglot nature of Vert.x application allows you use the most appropriate language for the task.
Vert.x lets you implement distributed applications, either by using the built-in TCP and HTTP server and client, but
also using the Vert.x event bus, a lightweight mechanism to send and receive messages. With the event bus, you send
messages to addresses. It supports three modes of distributions:
-
point to point: the message is sent to a single consumer listening on the address
-
publish / subscribe: the message is received by all the consumers listening on the address
-
request / reply: the message is sent to a single consumer and let it reply to the message by sending another message to the initial sender
Wao!, that’s a lot of information to process… However, you might still want to ask: What kind of applications can I use Vert.x for? We say, Vert.x is incredibly flexible - whether it’s simple network utilities, sophisticated modern web applications, HTTP/REST microservices, high volume event processing or a full blown backend message-bus application, Vert.x is a great fit. It’s fast, and does not constraint you. Last but not least, Vert.x provides appropriate tools to build reactive systems; systems that are: responsive, elastic, resilient and asynchronous!
3. Demystifying microservices
Except you spent the last year in a cave, you probably have heard about microservices. So what are microservices? To answer this questions, let’s quote from a veteran:
The microservice architectural style is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API. These services are built around business capabilities and independently deployable by fully automated deployment machinery. There is a bare minimum of centralized management of these services, which may be written in different programming languages and use different data storage technologies.
http://martinfowler.com/articles/microservices.html
Microservice is an architectural style, which means it is a specialization of element and relation types, together with constraints and how they can be used [4]. I believe by now, I have left you more confused than you before we started.
Not to worry. Let’s take another approach. Why do we need microservices? In one word: agility [5]. Let’s imagine, we have an application, rather large. As a large application, the maintenance is a nightmare, adding features take too much time, the technology used is very outdated (What? Corba is not cool anymore?), any change needs to pass a 50-steps process and be validated by 5 levels of management. Obviously there are several teams on the application with different requirements and agendas. Well, we have such a monster app. How could we make the development and maintenance of this application efficient? Microservices are one answer to this question. It aims to reduce the time to production.
To do that end, the microservice architectural style proposes to:
-
split the application into a set of decoupled components providing defined
services(defined means with a known interface or API) -
allow the components communicate with whatever protocol the choose, often REST, but not necessarily
-
allow the components use whatever languages and technologies they want
-
allow each component be developed, released and deployed independently
-
allow the deployments be automated in their own pipeline
-
allow the orchestration of the whole application be reduced to the barest minimum
In this lab, we won’t address point 5, but you should know that Vert.x does not restricts how you deploy your components. You can employ whatever technology best suites your environment. whether it is ssh, ansible, puppet, docker, cloud, fabric8, or even floppy disk.
Point 6, however, is interesting, and often misunderstood. It’s pretty cool to develop independent pieces of software that magically interact at runtime. Yes, I said magically but in technology we don’t believe in magic. To make this happen, what we need is some form of service discovery at runtime. The service discovery mechanism can achieve it’s goal with any number of suitable means. These range from: hardcoding the service location in the code (which is generally a bad idea), using a DNS lookup service, or some more advanced techniques.
Having a service discovery mechanism allows our system components interact transparently amongst each other regardless of location or environment. It also allows us to easily load-balance amongst our components through a round robin alogorithm, for example, thereby making our system more fault-tolerant (by locating another service provider when one breaks down).
Although by definition, microservice applications are not required to be distributed, there usually are in practice. This comes with all the distributed application benefits and constraints: consensus computation (FLP), CAP theorem, consistency, monitoring, and many other reasons to fail. So microservices applications need to be designed to accommodate failures from their early implementation stage.
Before we go further, there are a couple of points I would like to mention. Microservices are not new and the concept is not rocket science. Academic papers from the 70’s and 80’s have defined (using different words) architectural styles very close to this. Also very important point to understand is: microservices are not a silver bullet. (Unless well managed) it has the capacity to increase the complexity of your application due to its distributed nature. Lastly, a microservice architecture will not fix all your issues.
The major concerns when it comes microservices are rapid delivery, adaptation, independence and replaceability. Every microservice is made to be replaceable by another providing the same service / API / interface (at the core, it’s basically an application of the Liskov substitution principle).
If you have been a developer for about 10 years, you might want to ask what difference is between microservices and SOA. For a lot of people it’s about size. This is not always true because services don’t necessarily have to be small which makes the term "microservice" quite misleading. Microservices and SOA differ purpose but the basic concepts are similar:
-
service : a defined feature accessible by an API, a client, a proxy, anything
-
service provider: a component implementing a service
-
service consumer: a component consuming a service
-
service discovery: the mechanism used by a consumer to find a provider
Both approaches inherit from the service oriented computing, aiming to decompose and manage independent pieces of software. You may have never heard about this even if you used it: COM, Corba, Jini, OSGi, and web services are all different implementations of service oriented computing.
Finally, there is a common misconception that microservices must be RESTful by nature. This can’t be farther from the truth. Microservices can employ any number interaction style that best fit their purpose: RPC, events, messages, streams etc. In this lab we will using RESTful services, async RPC, and message sources.
4. The Micro-Trader Application
Now that we know more about Vert.x and microservices, it’s time to discuss the application we are going to develop in this lab.
It’s going to be a fake financial app, where we will be making (virtual) money. The application is composed of a set of microservices:
-
The quote generator - this is an absolutely unrealistic simulator that generates the quotes for 3 fictional companies MacroHard, Divinator, and Black Coat. The market data is published on the Vert.x event bus.
-
The traders - these are a set of components that receives quotes from the quote generator and decides whether or not to buy or sell a particular share. To make this decision, they rely on another component called the portfolio service.
-
The portfolio - this service manages the number of shares in our portfolio and their monetary value. It is exposed as a service proxy, i.e. an asynchronous RPC service on top of the Vert.x event bus. For every successful operation, it sends a message on the event bus describing the operation. It uses the quote generator to evaluate the current value of the portfolio.
-
The audit - that’s the legal side, you know… We need to keep a list of all our operations (yes, that’s the law). The audit component receives operations from the portfolio service via an event bus and address . It then stores theses in a database. It also provides a REST endpoint to retrieve the latest set of operations.
-
The dashboard - some UI to let us know when we become rich.
Let’s have a look at the architecture:
The application uses several types of services:
-
HTTP endpoint (i.e. REST API) - this service is located using an HTTP URL.
-
Service proxies - these are asynchronous services exposed on the event bus using an RPC interaction mechanism, the service is located using an (event bus) address.
-
Message sources - these are components publishing messages on the event bus, the service is located using an (event bus) address.
These components runs in the same network (in this lab they will be on the same machine, but in different processes).
The dashboard presents the available services, the value of each company’s quotes, the latest set of operations made by our traders and the current state of our portfolio. It also shows the state of the different circuit breakers.
We are going to implement critical parts of this application. However, the rest of the code is provided to illustrate some other Vert.x features. The code that needs to be written by us is indicated using TODO and wrapped as follows:
//TODO
// ----
// your code here
// ----
5. Prerequisites
We will get to the good stuff, coding all the way soon… But before we start, we need to install a couple of software on our machine.
5.1. Hardware
-
Operating System: Mac OS X (10.8+), Windows 7+, Ubuntu 12+, CentOS 7+, Fedora 22+
-
Memory: At least 4 GB+, preferred 8 GB
5.2. Java Development Kit
We need a JDK 8+ installed on our machine. Latest JDK can downloaded from:
You can use either Oracle JDK or OpenJDK.
5.3. Maven
-
Download Apache Maven from https://maven.apache.org/download.cgi.
-
Unzip to a directory of your choice and add it to the
PATH.
5.4. IDE
We recommend you to use an IDE. You can use Eclipse, IntelliJ or Netbeans.
5.4.1. No IDE ?
If you don’t have an IDE, here are the step to get started with Eclipse.
-
First download Eclipse from the download page.
-
In the Eclipse Package list, select Eclipse IDE for Java Developers. It brings you to a download page with a
Downloadbutton. -
Once downloaded, unzip it.
-
In the destination directory, you should find an
Eclipsebinary that you can execute. -
Eclipse asks you to create a workspace.
-
Once launched, click on the Workbench arrow (top right corner).
6. Let’s start !
6.1. Getting the code
git clone https://github.com/cescoffier/vertx-microservices-workshop
You can import the code in your IDE as a Maven project. You can refer to your IDE documentation to know how to import Maven projects.
For Eclipse:
-
Click on
File - Import … - Maven - Existing Maven Projects -
Select the location where you cloned the sources
-
Click Finish and wait…
You are going to see a couple of compilation errors. This is because Eclipse does not mark the src/main/generated
directories as source root by default. Right-click on portfolio-service/src/main/generated and select Build Path
→ Use as Source Folder.
A complete solution is given in the solution directory.
|
6.2. Build
The build is managed using Apache Maven. Compile everything using:
cd vertx-microservices-workshop
mvn clean install
As said previously, Vert.x does not depend on Apache Maven. You can use any build tool (or even no build tool).
6.3. Clustering and Service Discovery infrastructure
It’s about time to start. If you remember the microservice section, we need some service discovery mechanism. Fortunately, Vert.x offers a service discovery mechanism. To let every component discover the services, it needs to store the service record in a location that every one can access.
By default, Vert.x service discovery is going to use a distributed map accessible by all the members of the cluster. So, when you start your Vert.x application and enable the cluster mode, it joins a cluster. This cluster lets nodes:
-
to come and go, and so manage member discovery (member != service)
-
to share data such as distributed maps, locks, counters…
-
to access the event bus
In our context, you don’t need to configure anything. The lab provides a cluster configuration using unicast and the
127.0.0.1 IP (so you won’t see the services from your neighbors). The cluster is built on top of Hazelcast. You can
check the configuration in vertx-workshop-common/src/main/resources/cluster.xml.
7. The first microservice - the quote generator
Disclaimer, the quote generator is unrealistic, and is basically a set of randoms…
7.1. Project structure
Let’s have a look to the project, as every other project are structured the same way.
.
├── README.md <--- component description
├── pom.xml <--- the Maven file
└── src
├── conf
│ └── config.json <--- a configuration file that is passed when the application starts
└── main
└── java
└── io.vertx.workshop.quote
├── GeneratorConfigVerticle.java <--- the verticles
├── QuoteVerticle.java
└── RestQuoteAPIVerticle.java
Let’s start with the pom.xml file. This file specifies the Maven build:
-
Define the dependencies
-
Compile the java code and process resources (if any)
-
Build a fat-jar
A fat-jar (also called uber jar or shaded jar) is a convenient way to package a Vert.x application. It creates a
über-jar containing your application and all its dependencies, including Vert.x. Then, to launch it, you just need
to use java -jar …., without having to handle the CLASSPATH. Wait, I told you that Vert.x does not dictate a
type of packaging. It’s true, fat jars are convenient, but it’s not the only way, you can use plain (not fat) jars,
OSGi bundles…
The created fat-jar is configured to use a main class provided in vertx-workshop-common (io.vertx.workshop
.common.Launcher). This Launcher class creates the Vert.x instance, configure it and deploys the main-verticle.
Again, it’s just a convenient way, you can use your own main class.
7.2. Verticle
As you may have noticed, the code is structured in 3 verticles, but what are these? Verticles is a way to structure
Vert.x application code. It’s not mandatory, but it is quite convenient. A verticle is a chunk of code that is
deployed on top of a Vert.x instance. A verticle has access to the instance of vertx on which it’s deployed, and
can deploy other verticles.
Let’s open the GeneratorConfigVerticle class, and look at the start method:
@Override
public void start() {
super.start();
JsonArray quotes = config().getJsonArray("companies");
for (Object q : quotes) {
JsonObject company = (JsonObject) q;
// Deploy the verticle with a configuration.
vertx.deployVerticle(MarketDataVerticle.class.getName(),
new DeploymentOptions().setConfig(company));
}
vertx.deployVerticle(RestQuoteAPIVerticle.class.getName());
publishMessageSource("market-data", ADDRESS, rec -> {
if (!rec.succeeded()) {
rec.cause().printStackTrace();
}
System.out.println("Market-Data service published : " + rec.succeeded());
});
publishHttpEndpoint("quotes", "localhost", config().getInteger("http.port", 8080), ar -> {
if (ar.failed()) {
ar.cause().printStackTrace();
} else {
System.out.println("Quotes (Rest endpoint) service published : " + ar.succeeded());
}
});
}
Verticles can retrieve a configuration using the config() method. Here it gets the details about the companies to
simulate. The configuration is a JsonObject. Vert.x heavily uses JSON, so you are going to see a lot of JSON in
this lab. For each company found in the configuration, it deploys another verticle with the extracted configuration.
Finally, it deploys another verticle providing a very simple HTTP API.
The last part of the method is about the service discovery mentioned in the microservice section. This component
generates quotes sent on the event bus. But to let other components discover where the messages are sent (where
means on which address), it registers it. market-data is the name of the service, ADDRESS is the event bus
address on which the messages are sent. The last argument is a Handler that is notified when the registration has
been completed. The handler receives a structure called AsyncResult.
Remember, Vert.x is promoting an asynchronous, non-blocking development model. Publishing the service may take time
(actually it does as it creates a record, write it to the backend, and notifies everyone), as we cannot block the
event loop, the method is asynchronous. Asynchronous methods have a Handler as last parameter that is invoked when
the operation has been completed. This Handler is called with the same event loop as the one having called the async
method. As the asynchronous operation can fail, the Handler receives as parameter an AsyncResult telling
whether or not the operation has succeeded. You will see the following patterns a lot in Vert.x applications:
// Asynchronous method returning an object of type X
operation(param1, param2, Handler<AsyncResult<X>>);
// Handler receiving an object of type X
ar -> {
if (ar.succeeded()) {
X x = ar.result();
// Do something with X
} else {
// it failed
Throwable cause = ar.cause();
}
}
If you remember the architecture, the quote generator also provides a HTTP endpoint returning the last values of the quotes (but, you are going to work on it). As the previous service, it needs to be published. For the publication it gives details on its locations (host, port…):
publishHttpEndpoint("quotes", "localhost", config().getInteger("http.port", 8080), ar -> {
if (ar.failed()) {
ar.cause().printStackTrace();
} else {
System.out.println("Quotes (Rest endpoint) service published : " + ar.succeeded());
}
});
7.3. The quote REST endpoint
It’s time for you to develop some parts of the application (I know you have pins and needles in your fingers). Open the
RestQuoteAPIVerticle. It’s a verticle class extending AbstractVerticle. In the start method you need to:
-
Register an event bus consumer to collect the last quotations (in the
quotesmap) -
Handle HTTP requests to return the list of quotes, or a single quote if the
name(query) param is set.
Let’s do that….
7.3.1. Task - Implementing a Handler to receive events
The first action is about creating a Handler, so a method that is invoked on event. Handlers are an important part of Vert.x, so it’s important to understand what they are.
In this task, the Handler is going to be called for each message sent on the event bus on a specific address (receiving each quote sent by the generator). It’s a message consumer. The message parameter is the received message.
Implement the logic that retrieve the body of the message (with the body() method. Then extract from the body the name of the quote and add an entry name → quote in the quotes map.
7.3.2. Task - Implementing a Handler to handle HTTP requests
Now that you did a first Handler, let’s do a second one. This handler does not receive messages from the event bus, but HTTP requests. The handler is attached to a HTTP server and is called every time there is a HTTP request to the server, the handler is called and is responsible for writing the response.
To handle the HTTP requests, we need a HTTP server. Fortunately, Vert.x lets you create HTTP servers using:
vertx.createHttpServer()
.requestHandler(request -> {...})
.listen(port, resultHandler);
Write the content of the request handler to respond to the request:
-
a response with the
content-typeheader set toapplication/json -
retrieve the
nameparameter (it’s the company name) -
if the company name is not set, return all the quotes as json.
-
if the company name is set, return the stored quote or a 404 response if the company is unknown
The response to a request is accessible using request.response()
|
To write the response use response.end(content).
|
To create the JSON representation of an object, you can use the Json.encode method
|
You may wonder why synchronization is not required. Indeed we write in the map and read from it without any synchronization constructs. Here is one of the main feature of Vert.x: all this code is going to be executed by the same event loop, so it’s always accessed by the same thread, never concurrently.
The Map<String, JsonObject> could be replaced by a simple JsonObject as they behave as a Map<String, Object>.
|
7.4. Time to start the quote generator
First, let’s build the microservice fat-jar. In the terminal, execute:
cd quote-generator mvn package
Then, open a new terminal and launch:
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar
This command launches the application. The main class we used creates a clustered Vert.x instance and reads the
configuration from src/conf/config.json. This configuration provides the HTTP port on which the REST service is
published (35000).
Let’s now open a browser and have a look to http://localhost:35000.
It should return something like:
{
"MacroHard" : {
"volume" : 100000,
"shares" : 51351,
"symbol" : "MCH",
"name" : "MacroHard",
"ask" : 655.0,
"bid" : 666.0,
"open" : 600.0
},
"Black Coat" : {
"volume" : 90000,
"shares" : 45889,
"symbol" : "BCT",
"name" : "Black Coat",
"ask" : 654.0,
"bid" : 641.0,
"open" : 300.0
},
"Divinator" : {
"volume" : 500000,
"shares" : 251415,
"symbol" : "DVN",
"name" : "Divinator",
"ask" : 877.0,
"bid" : 868.0,
"open" : 800.0
}
}
It gives the current details of each quotes. The data is updated every 3 seconds, so refresh your browser to get the latest data.
Let’s now launch the dashboard. In another terminal, navigate to $project-home/trader-dashboard and execute:
mvn clean package
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar
Then, open your browser to http://localhost:8080. You should see:
Some parts have no content, and it’s expected as it’s just the beginning…
7.5. You are not a financial expert ?
So maybe you are not used to the financial world and words… Neither am I, and this is a overly simplified version. Let’s define the important fields:
-
name: the company name -
symbol: short name -
shares: the number of stock that can be bought -
open: the stock price when the session opened -
ask: the price of the stock when you buy them (seller price) -
bid: the price of the stock when you sell them (buyer price)
You can check Wikipedia for more details.
8. Event bus services - Portfolio service
In the quote generator we have seen the basics of Vert.x development:
-
Asynchronous API and
AsyncResult -
Implementing
Handler -
Receiving messages from the event bus
In the portfolio component, we are going to implement an event bus service. A Portfolio stores the owned shares and
the available cash.
8.1. RPC and Async RPC
Microservices are not only about REST. They can be exposed using any types of interactions, and Remote Procedure Calls is one of them. With RPC, a component can effectively send a request to another component by doing a local procedure call, which results in the request being packaged in a message and sent to the callee. Likewise, the result is sent back and returned to the caller component as the result of the procedure call:
Such interactions has the advantages to introduce typing, and so is less error-prone than unstructured messages. However, it also introduces a tighter coupling between the caller and the callee. The caller knows how to call the callee:
-
how the service is called
-
where the service is living (location)
Traditional RPC exhibits an annoying drawback: the caller waits until the response has been received. This is definitely a blocking call, as it involves at least two network messages. In addition, it is not really designed for failures while distributed communications have many reasons to fail.
Fortunately, Vert.x proposes a different form of RPC: Async RPC. Async RPC follows the same principles as RPC, but
instead of waiting for the response, it passes a Handler<AsyncResult<X> called when the result is received.
The AsyncResult notifies the Handler whether the invocation succeeded or failed. Upon success, the handler can
retrieve the result.
Such async-RPC has several advantages:
-
the caller is not blocked
-
it deals with failures
-
it avoids you to send messages on the event bus and manages object marshalling and unmarshalling for you.
8.2. Async service interface
To create an async RPC service, or event bus service, or service proxies, you first need a Java interface declaring the
async methods. Open the io.vertx.workshop.portfolio.PortfolioService class.
The class is annotated with:
-
ProxyGen- enables the event bus service proxy and server generation -
VertxGen- enables the creation of the proxy in the different language supported by Vert.x
Let’s have a look at the first method:
void getPortfolio(Handler<AsyncResult<Portfolio>> resultHandler);
This method lets you retrieve a Portfolio object. As explained above the method is asynchronous and so has a Handler
parameter receiving an AsyncResult<Portfolio>. The other methods follows the same pattern.
You may have also noticed that the package has a package-info.java file. This file is required to enable the
service proxy generation.
|
8.3. Data objects
The Portfolio object is a data object. Event bus proxies support a limited set of types, and for non-supported types,
it must use data objects (please check the documentation for the whole list
of supported types). Data objects are Java classes obeying to a set of constraints:
-
It must be annotated with
DataObject -
It must have an empty constructor, a copy constructor and a constructor taking a
JsonObjectas parameter -
It must have a
toJsonmethod building aJsonObjectrepresenting the current object -
Fields must be property with (getters and setters)
Let’s open the io.vertx.workshop.portfolio.Portfolio class to see how it looks like. As you can see, all the JSON
handling is managed by converters that are automatically generated, so a data object is very close to a simple bean.
8.4. Implementing the service
It’s nice to have an async interface for our service, but it’s time to implement it. We are going to implement three methods in this service:
-
getPortfolioto understand how to createAsyncResultobjects -
sendActionOnTheEventBusto see how to send messages on the event bus -
evaluatecomputing the current value of the portfolio
8.4.1. Task - Creating AsyncResult instances
As we have seen above, our async service have Handler<AsyncResult<Portfolio>> parameter. So when we implement this
service, we would need to call the Handler with an instance of AsyncResult. To see how this works, let’s
implement the getPortfolio method:
In io.vertx.workshop.portfolio.impl.PortfolioServiceImpl, fill the getPortfolio method. It should call the handle method of the resultHandler with a successful async result. This object can be created from the (Vert.x) Future method.
But, wait, what is the relationship between AsyncResult and Future ? A Future represents the result of an action
that may, or may not, have occurred yet. The result may be null if the Future is used to detect the completion of
an operation. The operation behind a Future object may succeed or fail. AsyncResult is a structure describing the
success of the failure of an operation. So, Future are AsyncResult. In Vert.x AsyncResult instances are
created from the Future class.
AsyncResult describes:
-
a success as shown before, it encapsulates the result
-
a failure, it encapsulates a
Throwableinstance
Did you know that the term Future has been introduced in 1977, Promise in 1976… Not really new things.
|
So, how does this work with our async RPC service, let’s look at this sequence diagram:
8.4.2. Task - Sending event on the event bus
In the previous chapter, we have registered a consumer receiving event bus services, it’s time to see how to send
messages on the event bus. You access the event bus using vertx.eventBus(). From this object you can:
-
send: send a message in point to point mode -
publish: broadcast a message to all consumers registered on the address -
sendwith aHandler<AsyncResult<Message>>>: send a message in point to point mode and expect a reply
In the last point, notice the AsyncResult<Message>. It’s an async result as the reply may never arrive (and so will
be considered as a failure).
Ok, back to our code. We have provided the buy and sell methods, that are just doing some checks before buying or
selling shares. Once the action is emitted, we send a message on the event bus that will be consumed by the Audit
Service and the Dashboard. So, we are going to use the publish method.
Write the body of the sendActionOnTheEventBus method in order to publish a message on the EVENT_ADDRESS address containing a JsonObject as body. This object must contains the following entries:
-
action → the action (buy or sell)
-
quote → the quote as Json
-
date → a date (long in milliseconds)
-
amount → the amount
-
owned → the updated (owned) amount
8.4.3. Task - Coordinating async methods and consuming HTTP endpoints - Portfolio value evaluation
The last method to implement is the evaluate method. This method computes the current value of the portfolio.
However, for this it needs to access the "current" value of the stock (so the last quote). It is going to consume
the HTTP endpoint we have implemented in the quote generator. For this, we are going to:
-
discover the service
-
call the service for each company we own some shares
-
when all calls are done, compute the value and send it back to the caller
That’s a bit more tricky, so let’s do it step by step. First, in the evaluate, we need to retrieve the HTTP
endpoint (service) provided by the quote generator. This service is named quotes. We published in in the previous
section. So, let’s start to get this service.
Fill the evaluate method to retrieve the quotes service. You can retrieve Http services using HttpEndpoint.getClient. The name of the service is quotes. If you can’t retrieve the service, just passed a failed async result to the handler. Otherwise, call computeEvaluation.
Here is how the computeEvaluation method is implemented:
private void computeEvaluation(HttpClient httpClient, Handler<AsyncResult<Double>> resultHandler) {
// We need to call the service for each company we own shares
List<Future> results = portfolio.getShares().entrySet().stream()
.map(entry -> getValueForCompany(httpClient, entry.getKey(), entry.getValue())) (1)
.collect(Collectors.toList());
// We need to return only when we have all results, for this we create a composite future.
// The set handler is called when all the futures has been assigned.
CompositeFuture.all(results).setHandler( (2)
ar -> {
double sum = results.stream().mapToDouble(fut -> (double) fut.result()).sum(); (3)
resultHandler.handle(Future.succeededFuture(sum)); (4)
});
}
First, we need to get a list of Future that would receive the different evaluations (one per company) (1). This
evaluation is asynchronous (as it involves a HTTP call to get the latest value). We don’t know when these Future
will be all valuated (or assigned). Fortunately, Vert.x provides CompositeFuture for this very purpose (2).
CompositeFuture.all calls its assigned handler when all the given Futures are assigned. So when the handler is
executed, we knows all the futures has received a value, and so we can compute the sum (3). Finally, we send this
result to the client by calling the resultHandler (4).
Well, we just need the getValueForCompany method that call the service. Write the content of this method. You would need to create a Future object to report the completion of the operation. This future is the "returned" result of the method. Then, call the HTTP endpoint (/?name= + encode(company)).
The encode(String) method is provided for you.
|
When the response arrives, check the status (should be 200) and retrieve the body (with bodyHandler). The body can be parsed as a JsonObject using buffer.toJsonObject(). The value you compute is the numberOfShares * the bid price (read from the body). Once the value is computed, complete the future. Don’t forget to report failures to the future too. To simplify, if the company is unknown (meaning the response status code is not 200) we assume the value of the shares to be 0.0.
8.5. Task - Publishing a service
Now that the service implementation is complete, let’s publish it ! First we need a verticle that creates the actual
service object, registers the service on the event bus and publishes the service in the service discovery
infrastructure.
Open the io.vertx.workshop.portfolio.impl.PortfolioVerticle class. In its start method is does what we just say:
1) Create the service object with:
PortfolioServiceImpl service = new PortfolioServiceImpl(vertx, discovery, config().getDouble("money", 10000.00));
2) Register it on the event bus using the ProxyHelper class:
ProxyHelper.registerService(PortfolioService.class, vertx, service, ADDRESS);
3) Publish the service in the service discovery infrastructure to make it discoverable:
publishEventBusService("portfolio", ADDRESS, PortfolioService.class, ar -> {
if (ar.failed()) {
ar.cause().printStackTrace();
} else {
System.out.println("Portfolio service published : " + ar.succeeded());
}
});
The publishEventBusService is implemented as follows:
// Create the service record:
Record record = EventBusService.createRecord(name, address, serviceClass);
// Publish it using the service discovery
discovery.publish(record, ar -> {
if (ar.succeeded()) {
registeredRecords.add(record);
completionHandler.handle(Future.succeededFuture());
} else {
completionHandler.handle(Future.failedFuture(ar.cause()));
}
});
Are we done ? No…. We have a second service to publish. Remember, we are also sending messages on the event bus when we buy or sell shares. This is also a service (a message source service to be exact).
At the end of the start method, write the code required to publish the portfolio-events service. EVENT_ADDRESS is the event bus address.
there are publish methods available depending of your service type.
|
Now we are done, and it’s time to build and run this service.
8.6. Run time !
To build the project launch:
cd portfolio-service mvn clean package
Then, launch it, in another terminal with:
java -jar target/portfolio-service-1.0-SNAPSHOT-fat.jar
Here you go, the portfolio service is started. It discovers the quotes service and is ready to be used.
Go back to the dashboard, and you should see some new services and the cash should have been set in the top left corner.
| The dashboard is consuming the portfolio service using the async RPC mechanism. A client for JavaScript is generated at compile time, and use SockJS to communicate. Behind the hood there is a bridge between the event bus and SockJS. |
Well, it’s time to buy and sell some shares no ? Let’s do that in the next chapter.
9. The compulsive traders
In the portfolio project we have implemented an event bus service to manage our portfolio. In the quote-generator
we send quotes on the event bus. Traders are components that consumes these two services with only one goal: getting
rich (or not…)!
In this section, we are going to develop 2 traders (following a stupid logic you can definitely improve):
-
the first trader is developed in Java
-
the second trader is developed in Groovy
9.1. Compulsive and dumb traders
Before seeing how these are implemented, let’s explain the absolutely illogic algorithm used by these traders:
-
A compulsive trader is picking randomly one company name and a number of shares (
x) -
Randomly, it tries to buy or sell
xshares of the company
It does not check whether or not it has enough shares or money, it just tries… This logic is implemented in
io.vertx.workshop.trader.impl.TraderUtils.
9.2. Deploying several instance and polyglot verticles.
The compulsive-trader project contains a main-verticle (io.vertx.workshop.trader.impl.MainVerticle) that is
going to configure and deploy the traders:
@Override
public void start() throws Exception {
// Java traders
vertx.deployVerticle(JavaCompulsiveTraderVerticle.class.getName(),
new DeploymentOptions().setInstances(2)); (1)
// Groovy traders...
vertx.deployVerticle("GroovyCompulsiveTraderVerticle.groovy"); (2)
}
The JavaCompulsiveTraderVerticle is deployed with some DeploymentOptions (1). This one sets the number of
instances to 2, so Vert.x is not instantiating this verticle once, with twice (2 different objects).
So, the previous code deploys three traders.
The Groovy verticle is deployed by passing the file name. The verticle file is in src/main/resources and will be
located at the root of the fat-jar.
It’s now time to implement these verticles
9.3. The Java Trader
Open the io.vertx.workshop.trader.impl.JavaCompulsiveTraderVerticle class. In the place of the TODO we need:
-
Initialize the trader
-
Retrieve the 2 services we use
-
When both have been retrieve, apply the trading logic on every new market data.
9.3.1. Task - 1. Initialize the trader
First, look at the start method signature: start(Future<Void> future). The future let us tell when the
initialization has been completed and whether or not it has been successful. That means we have to complete or
fail this future explicitly.
To initialize the trader, remove the future.fail statement and initialize the company and numberOfShares variables by using the TraderUtils class.
9.3.2. Task - 2. Retrieving several services
The trader needs the Portfolio service and the market service (the message source sending the market data). We
cannot start the trading logic before having retrieved both of them. Let’s use the Future composition feature we
have seen in the previous section.
Write the code (just after the previous code you wrote) required to retrieve the 2 services. When both are retrieved (use the all composition operator), just do nothing. The handler will be filled in the next step.
9.3.3. Task - 3. Applying the trading logic
Almost done! Now write the last handler. If the retrieval as failed, just report the failure on future. Otherwise, registers a message consumer on the market service and every time you get a message, apply the TraderUtils.dumbTradingLogic method. Then complete future.
9.3.4. The code in one snippet
Here is the whole code.
9.3.5. Run this trader
We can already run this trader and see if it makes educated actions on the market. Package it using:
cd compulsive-traders
mvn clean package
Then launch the application with:
java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar
If you go back to the dashboard, you may start seen some moves on your portfolio.
9.4. Task - Writing a Groovy Verticle
The Groovy trader is using the same trading logic, but this verticle is going to be developed in Groovy. To ease the understanding, the code is going to be very close to the Java version.
Open src/main/resources/GroovyCompulsiveTraderVerticle.groovy. This verticle is going to be a Groovy Script. So
the content is the start method of the verticle. Vert.x also supports Groovy classes.
If you don’t know Groovy, just copy and paste the solution. If you do, you can try to implement the trader by yourself following the same logic as the Java trader:
-
Groovy version of the Vert.x API are in
io.vertx.groovy.x.y, for instanceio.vertx.groovy.core.CompositeFuture. -
Json Objects are Groovy Maps, so a
MessageConsumer<JsonObject>in Java is aMessageConsumer<Map>in Groovy.
It’s time to rebuild and restart our traders. Hit CTRL+C to stop the running trader. Then, rebuild with:
mvn clean package
And launch the application with:
java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar
If you go back to the dashboard, you may start seen some moves on your portfolio. Now 3 traders are trying to make you (virtually) rich.
10. The Audit service
The law is the law… The Sarbanes–Oxley Act requires you to keep a track of every transaction you do on a financial market. The audit service records the shares you buy and sell in a database. It’s going to be a HSQL database, but is would be similar with another database, even no-sql database.
10.1. Accessing data asynchronously
As said previously, Vert.x is asynchronous and you must never block the event loop. And you know what’s definitely blocking ? Database accesses and more particularly JDBC! Fortunately, Vert.x provides a JDBC client that is asynchronous.
The principle is simple (and is applied to all clients accessing blocking systems):
Worker ? Yes, Vert.x has the notion of workers (a separated thread pool) to execute blocking code. It can be a
verticle marked as worker or with the vertx.executeBlocking construct. However, even if possible, you should not
abuse from these features as it reduces the scalability of the application.
|
However, interactions with databases are rarely a single operation, but a composition of operations. For example:
-
Get a connection
-
Drop some tables
-
Create some tables
-
Close the connection
So, we need a way to compose these operations, and report failures when required. This is what we are going to see in the Audit component.
10.2. The Audit service
The Audit service:
-
Listens for the financial operations on the event bus
-
Stores the received operations in a database
-
Exposes a REST API to get the last 10 operations
Interactions with the database use the vertx-jdbc-client, an async version of JDBC. So expect to see some SQL code
(I know you love it).
10.3. Rxjava
10.3.1. Intro
Vert.x uses a simple callback based asynchrony and its Future object is an helper tool useful for callback
coordination. RxJava implements the Reactive Extensions for the JVM and is a library for composing asynchronous and
event-based programs.
With RxJava, you model your code around data flow (called Observable). These data flow are pipes in which data
transits. These Observable can represent finite or infinite streams, but also streams with a single element. When
a stream is known to hold exactly one element, it is convenient to use the Single type. Finally a Completable
represents a stream with no elements, i.e it can only complete without a value or fail.
To use such reactive type, a subscription operation is necessary.
Observable<String> observable = getStringObservable();
// Subscribe to the stream
observable.subscribe(item -> {
// Received a String item
}, error -> {
// Error termination => no more items
}, () -> {
// Normal termination => no more items
});
Singles are simpler to work with as they hold exactly one element, they have some similarities with future/promises although they have noticeable differences:
-
a future/promise is the result of an asynchronous operation, e.g, you start a server and you get a promise of the server bind result
-
a single result usually has a side effect at subscription time, e.g you subscribe to the single, as side effect it start the server and the single notifies you of the bind result
Single<String> single = getStringSingle();
// Subscribe to the single
single.subscribe(item -> {
// Completion with the string item
}, error -> {
// Completion with an error
});
10.3.2. Composition and transformation
RxJava provides a very useful set of operators for composing and transforming asynchronous flows.
We will use the main ones in this lab : map, flatMap and zip.
The map operator transforms synchronously the result of an operation.
// Transform the stream of strings into a stream of Buffer
Observable<Buffer> observable = getStringObservable().map(s -> vertx.fileSystem().readFileBlocking(s));
// Transform the string single into a Buffer single
Single<Buffer> single = getStringSingle().map(s -> vertx.fileSystem().readFileBlocking(s));
The drawback of the map operator is the imposed synchrony, in order to retrieve the content of a file we have
to use the blocking version of the filesystem, and thus we break the Vert.x golden rule!
Fortunately there is an asynchronous version called flatMap.
// Transform the stream of strings into a stream of Buffer
Observable<Buffer> observable = getStringObservable().flatMap(s -> {
Single<Buffer> single = vertx.fileSystem().rxReadFile();
return single.toObservable();
});
// Transform the string single into a Buffer single
Single<Buffer> single = getStringSingle().flatMap(s -> {
Single<Buffer> single = vertx.fileSystem().rxReadFile();
return single;
});
The zip operator combines the results of several Observable/Single in a single result, let’s see with Single:
Single<String> single1 = getStringSingle();
Single<String> single2 = getStringSingle();
Single<String> single3 = getStringSingle();
Single<String> combinedSingle = Single.zip(single1, single2, single3, (s1,s2,s3) -> s1 + s2 + s3);
combinedSingle.subscribe(s -> {
// Got the three concatenated strings
}, error -> {
// At least one of single1, single2 or single3 failed
});
It works similarly for Observable, but for the sake of the conciseness we will not study it here.
10.4. Vert.x Rx
Vert.x has an Rx version of its asynchronous API packaged with the io.vertx.rxjava prefix, e.g io.vertx.rxjava.core.Vertx
is the Rx version of io.vertx.core.Vertx. The rxified version of Vert.x exposes the asynchronous methods as
Single and the stream types as Observable.
10.4.1. Vert.x streams
The type ReadStream<T> models a reactive sequence of T items, for instance an HttpServerRequest is a ReadStream<Buffer>.
The rxified version exposes a toObservable() method to turn the stream into an Observable<T>:
import io.vertx.rxjva.core.Vertx;
import io.vertx.rxjva.core.http.HttpServer;
...
Vertx vertx = Vert.vertx();
HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> {
if (request.path().equals("/upload")) {
Observable<Buffer> observable = request.toObservable();
observable.subscribe(buffer -> {
// Got an uploaded buffer
}, error -> {
// Got an error => no more buffers
}, () -> {
// Done => no more buffers
});
}
});
In this section, we will not use Observable as we don’t focus much on reactive streams. However we will discover and
use Single.
10.4.2. Vert.x singles
Each asynchronous method, i.e a method having a last parameter Handler<AsyncResult<T>>, has an rxified version, named
after the original method name prefixe by rx, with the same parameters minus the last and returning a Single of the
asynchronous type.
Unlike the original method, calling the rx version does not make an actual call. Instead you get a single that will call the actual method at subscription time.
import io.vertx.rxjva.core.Vertx;
import io.vertx.rxjva.core.http.HttpServer;
...
Vertx vertx = Vert.vertx();
HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> ...);
// The single has been created but the server is actually not starting at this point
Single<HttpServer> listenSingle = server.rxListen(8080);
// Triggers the actual start
listenSingle.subscribe(
server -> {
// The server is started and bound on 8080
}, error -> {
// The server could not start
});
10.5. Task - Composing methods returning Single.
Open the io.vertx.workshop.audit.impl.AuditVerticle class. The first important detail of this verticle is its
start method. As the start method from the Java trader, the method is asynchronous, and report its completion in the
given Future object:
public void start(Future<Void> future) {
super.start();
// creates the jdbc client.
jdbc = JDBCClient.createNonShared(vertx, config());
// TODO
// ----
Single<MessageConsumer<JsonObject>> ready = Single.error(new UnsupportedOperationException("not yet implemented"));
// ----
readySingle.doOnSuccess(consumer -> {
// on success we set the handler that will store message in the database
consumer.handler(message -> storeInDatabase(message.body()));
}).subscribe(consumer -> {
// complete the verticle start with a success
future.complete();
}, error -> {
// signal a verticle start failure
future.fail(error);
});
}
Vert.x would consider the verticle deploy when the Future is valuated. It may also report a failure if the verticle
cannot be started correctly.
Initializing the audit service includes:
-
prepare the database (table)
-
start the HTTP service and expose the REST API. In addition publish this endpoint as a service
-
retrieve the message source on which the operation are sent
So, it’s clearly 3 independent actions, but the audit service is started only when all of them has been completed.
Replace the TODO block with some code. This code should retrieves 3 single objects (from methods provided in the class)
and wait for the completion of the three tasks. The three singles should be combined in one Single<MessageConsumer<JsonObject>>.
On success this single registers a message listener on the portfolio message source storing the operation in the database for each received message.
Its completion notifies Vert.x that the start process is completed (or successfully or not), it calls future.complete() and
future.fail(cause).
10.6. Task - Implementing a method returning a Single
Now our verticle extends the RxMicroServiceVerticle base class, this class provides the same helper method
than MicroServiceVerticle using Rx singles.
We have mentioned that async method have a signature with a Handler as last parameter. There is an equivalent syntax
that returns a Single object when the operations they are executing are completed:
void asyncMethod(a, b, Handler<AsyncResult<R>> handler);
// is equivalent to
Single<R> asyncMethod(a, b);
Indeed, the caller can subscribe on the returned Single object to execute the async operation and be notified when
the operation has completed or failed
Single<R> single = asyncMethod(a, b);
single.subscribe(r -> {
// Do something with the result
}, err -> {
// the operation has failed
});
Let’s implement the configureTheHTTPServer method following this pattern. In this method we are going to use a new Vert.x
Component: Vert.x Web. Vert.x Web is a Vert.x extension to build modern web application. Here we are going to use
a Router which let us implement REST APIs easily (à la Hapi or ExpressJS). So:
-
Create a
Routerobject with:Router.router(vertx) -
Register a route (on
/) on the router, callingretrieveOperations -
Create a HTTP server delegating the request handler to
router.accept. -
Retrieve the port passed in the configuration or
0if not set (it picks an available port), we can pick a random port as it is exposed in the service record, so consumer are bound to the right port. -
Start the server with the
rxListenversion of the listen method that returns a single.
So, the caller can call this method and get a Single. It can subscribe on it to bind the server and be notified
of the completion of the operation (or failure).
If you look at the retrieveThePortfolioMessageSource, you would see the very same pattern.
10.7. Using Async JDBC
In the start method, we are calling initializeDatabase. Let’s look at this method using another type of action
composition. This method:
-
get a connection to the database
-
drop the table
-
create the table
-
close the connection (whatever the result of the two last operations)
All these operations may fail.
In the last paragraph we have seen methods returning Single. Chains are a composition of such functions:
-
you have an input
-
you execute a first
Functiontaking the input from (1) and returning aSingle -
you execute a second
Functiontaking the input from (2) and returning aSingle -
….
The completion of a chain is a Single object. If one of the chained operation fails, this Single is marked as
failed, otherwise it is completed with the result of the last operation:
Single<X> chain = input.flatMap(function1).flatMap(function2).flatMap(function3);
So to use the composition pattern, we just need a set of Functions and a Single that would trigger the chain.
Let’s create this Single first:
// This is the starting point of our Rx operations
// This single will be completed when the connection with the database is established.
// We are going to use this single as a reference on the connection to close it.
Single<SQLConnection> connectionRetrieved = jdbc.rxGetConnection();
Then, we need compose the single with the flatMap method that is taking a SQLConnection as parameter and returns
a single that contains the result of the database initialization.
-
we create the batch to execute
-
the
rxBatchexecutes the batch gives us the single returns of the operation -
finally we close the connection with
doAfterTerminate
Single<List<Integer>> resultSingle = connectionRetrieved
.flatMap(conn -> {
// When the connection is retrieved
// Prepare the batch
List<String> batch = new ArrayList<>();
if (drop) {
// When the table is dropped, we recreate it
batch.add(DROP_STATEMENT);
}
// Just create the table
batch.add(CREATE_TABLE_STATEMENT);
// We compose with a statement batch
Single<List<Integer>> next = conn.rxBatch(batch);
// Whatever the result, if the connection has been retrieved, close it
return next.doAfterTerminate(conn::close);
});
The resultSingle is the final result providing a Single<List<Integer>> but we will return only a Single<Void>
as the caller only cares about the global result and not the detail.
This is simple achieved with the map operations on the single:
return resultSingle.<Void>map(null);
And voilà!
10.8. Task - Async JDBC with a callback-based composition
You may ask why we do such kind of composition. Let’s implement a method without any composition operator (just using
callbacks). The retrieveOperations method is called when a HTTP request arrives and should return a JSON object
containing the last 10 operations. So, in other words:
-
Get a connection to the database
-
Query the database
-
Iterate over the result to get the list
-
Write the list in the HTTP response
-
Close the database
The step (1) and (2) are asynchronous. (5) is asynchronous too, but we don’t have to wait for the completion. In this
code, don’t use composition (that’s the purpose of this exercise). In retrieveOperations, write the required code using Handlers / Callbacks.
So obviously it’s possible too not use composition. But imagine when you have several asynchronous operation to chain, it become a callback hell very quickly. So, as a recommendation: use the Vert.x composition operators or use the rxified version of Vert.x API.
10.9. Show time !
Let’s see how this works.
First you need to built it:
cd audit-service
mvn clean package
Then, you need to launch the application:
java -jar target/audit-service-1.0-SNAPSHOT-fat.jar
Restart and refresh the dashboard, and you should see the operations in the top right corner!
11. Anatomy of the dashboard
This section is about the dashboard. It covers:
-
How to configure configure Vert.x web to expose static resources
-
How to implement a REST endpoint delegating to another REST endpoint (proxy pattern)
-
How to protect microservices interaction against failures (exception handler, timeout, circuit breaker)
-
How to configure the SockJS - Eventbus bridge
-
How can you consume an event bus proxy from the browser
The dashboard is a single verticle (io.vertx.workshop.dashboard.DashboardVerticle).
11.1. Vert.x Web and static files
As mentioned in the previous section, Vert.x web is a Vert.x component to build web application. Its whole
architecture is centered on the Router object. You create this router and configure the routes. For each route
you configure the HTTP verb and the path and associate the Handler that is called when a matching request is
received. The router is creates a follows:
Router router = Router.router(vertx);
Vert.x web provides a set of Handler for common tasks such as serving static files:
// Static content
router.route("/*").handler(StaticHandler.create());
It serves all files from the webroot directory (default) or the server root. For example, webroot/index.html is
served using the http://0.0.0.0:8080/index.html url.
Once the router is configured, you need a HTTP server and use the router to handle the requests:
vertx.createHttpServer()
.requestHandler(router::accept)
.listen(8080);
11.2. Delegating REST calls
It’s often required to implement a REST API on top of another one. This pattern can be very costly on traditional architecture as each call would block a thread until the call to this second REST API has completed. With Vert.x delegation is easy, asynchronous and non blocking.
For example in the dashboard, we want to retrieve the list of operations. This list is offered by the audit service. So in the dashboard we have this route definition:
router.get("/operations").handler(this::callAuditService);
And the handler is:
private void callAuditService(RoutingContext context) {
if (client == null) {
context.response()
.putHeader("content-type", "application/json")
.setStatusCode(200)
.end(new JsonObject().put("message", "No audit service").encode());
} else {
client.get("/", response -> {
response
.bodyHandler(buffer -> {
context.response()
.putHeader("content-type", "application/json")
.setStatusCode(200)
.end(buffer);
});
})
.end();
}
}
The audit service is retrieved in the verticle start method. If the service was not found, it returns a no audit service message. Otherwise, it uses the retrieved HTTP client to call the audit REST API. The response is simply sent back to the HTTP response.
11.3. Task - Managing failures with exception handler and timeout
The callAuditService method we have seen in the previous section does the job… but well, what we do know about
distributed systems: they fail. It would be better to manage failures in this method. Vert.x proposes four
ways to handle failures:
-
failed asynchronous results and future
-
exception handlers
-
timeout
-
circuit breaker
We have already covered the first point. In this section we cover the point 2 and 3. The next section is about circuit breakers.
Exception handlers are handlers receiving a Throwable parameter and are responsible for managing the reported error. It is used on object that does not called a Handler<AsyncResult<?>> such as on a HTTP request. During a HTTP request-response interaction, failures can happen at several places:
-
a connection issue - the server does not reply or cannot be found.
-
a response issue - the server is there, but the connection is lost while retrieving the content
-
a server issue - the server returns a failure
This last point is application specific and need to be handle by your code. The first two points are managed by exception handlers on the 1) HTTP request, 2) HTTP response objects. Vert.x uses two different handlers because the root issue is different.
Copy this method into another one called callAuditServiceWithExceptionHandler, and set the exceptionHandler on the HTTP request and HTTP response. Both should call the fail method on the RoutingContext parameter. In addition, on the request, configure the response timeout. The exception handler set on the request is called if the response cannot be retrieved before this timeout. Don’t forget to update the handler of the /operations route to call this new method.
Once done, build the dashboard with:
cd trader-dashboard mvn clean package
Then, launch it, in another terminal with:
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar
Refresh the dashboard page. In the audit service terminal, stop the service and check how is reacting the dashboard (you can look at the AJAX request in the inspector/dev tools). Then, relaunch the audit service. What is happening ?
11.4. Task - Managing failures with a circuit breaker
Circuit breaker is a reliability pattern that can be represented with a simple state machine:
This pattern is very popular in microservice based applications, because it recovers (if possible) from failures smoothly. The circuit breaker starts in the close state. The circuit breaker monitors an operation. Every time this operation fails, it increases a failure counter. When a threshold is reached, it goes to the open state. In this state, the required service is not called anymore, but a fallback is executed immediately. After some time, the circuit breaker goes into the half-open state. In this state, the operation is called for the first request. Other request are redirected to the fallback. If the operation fails, the circuit breaker goes back to the open state until the next attempt. If it succeed it goes back to the close state.
There are lots of implementations of circuit breakers, Netflix Hystrix being the most popular. Vert.x provides its own implementation. Indeed, using Hystrix can be a bit cumbersome (but possible) as it does not enforce the Vert.x threading model.
In the DashboardVerticle.java file, a circuit breaker (called circuit) is initialized in the start method:
circuit = CircuitBreaker.create(
"http-audit-service", (1)
vertx,
new CircuitBreakerOptions()
.setMaxFailures(2) (2)
.setFallbackOnFailure(true) (3)
.setResetTimeout(2000) (4)
.setTimeout(1000)) (5)
.openHandler(v -> retrieveAuditService()); (6)
| 1 | the circuit breaker name |
| 2 | the number of failure before switching to the open state |
| 3 | whether or not the fallback should be called when a failure is detected, even in the close state |
| 4 | the time to wait in the open state before switching to the half-open state |
| 5 | the time before considering an operation as failed, if it didn’t complete |
| 6 | a handler called when the circuit breaker switches to the open state. We try to retrieve the audit service. |
With this circuit breaker, write a callAuditServiceWithExceptionHandlerWithCircuitBreaker method managing the arrival and the departure of the audit service. For this, use the circuit.<Buffer>executeWithFallback method. Don’t forget to update the handler of the /operations route.
Once done, rebuild and restart the dashboard. Stop the audit service and see how it behaves. Restart it. You can see on the dashboard page the state of the circuit breaker.
11.5. SockJS - Event bus bridge
SockJS is a browser JavaScript library that provides a WebSocket-like object. SockJS gives you a coherent, cross-browser, Javascript API which creates a low latency, full duplex, cross-domain communication channel between the browser and the web server. Under the hood SockJS tries to use native WebSockets first. If that fails it can use a variety of browser-specific transport protocols and presents them through WebSocket-like abstractions. SockJS-client does require a server counterpart to handle the communication. And you know what, Vert.x implements it !
With the SockJS - Event bus bridge, it lets the browser send and receive messages from the event bus.
To enable the bridge you need the following code:
SockJSHandler sockJSHandler = SockJSHandler.create(vertx); (1)
BridgeOptions options = new BridgeOptions();
options
.addOutboundPermitted(new PermittedOptions().setAddress("market")) (2)
.addOutboundPermitted(new PermittedOptions().setAddress("portfolio"))
.addOutboundPermitted(new PermittedOptions().setAddress("service.portfolio"))
.addInboundPermitted(new PermittedOptions().setAddress("service.portfolio"));
sockJSHandler.bridge(options); (3)
router.route("/eventbus/*").handler(sockJSHandler); (4)
In (1), we create the SockJSHandler. It needs to be configured, as by default, for security reasons, no messages are
transmitted. A set of permitted addresses configures bridge (2). Outbound addresses are for messages from the event
bus to the browser, while inbound addresses are for messages from the browser to the event bus. Finally in (3) and
(4), it configures the handler and create a router in the router. The /eventbus/* path is used by the SockJS
client (in the browser) to negotiate the connection, receive and send the messages.
This is not the only bridge that exists for the event bus. There is also a TCP event bus bridge for native systems. Notice also, that the SockJS bridge can also be used from Node.JS.
11.6. Consuming event bus service from the browser
As said above, there is a bridge between SockJS and the event bus to let the browser send and receive messages. As event bus services communicate using event bus messages, it is possible to implement a service client in the browser. Vert.x generates this client for you.
So, if you open the index.html file, you can see:
<script src="libs/portfolio_service-proxy.js"></script>
This imports a script generated by Vert.x (in the portfolio project). Then we can use the service as follows:
var service = new PortfolioService(eventbus, "service.portfolio");
service.getPortfolio(function (err, res) {
// ....
}
Yes, you can call the service method directly from your browser.
12. Conclusion
You made it ! Or you jumped to this section. Anyway, congratulations. We hope you enjoy this lab and learn some stuff. There is many other things Vert.x can do and that was not illustrated here.
Don’t forget that reactive systems and so Vert.x requires a mind-shift:
-
Vert.x is a toolkit to build reactive systems
-
Asynchronous, non-blocking development model can be hard to understand at the first glance, but it becomes very convenient very quickly. Don’t also forget, computers are asynchronous, so using such development model is using it the right way to use it to its whole power.
If you want, and I hope so, to go further here are some references:
13. References
Some recommended reading. Nothing especially about microservices or Vert.x because the concepts are broader than these two topics.
-
A. S. Tanenbaum, M Van Steam. Distributed Systems - Principles and Paradigms. 2003
-
L. Bass, I. Weber, L. Zhu. Devops, A software Architect’s Perspective. 2015
-
P. Clements, F. Bachmann, L Bass, D. Garlan, J. Ivers, R. Little, P. Merson, R. Nord, J. Stafford. Documenting Software Architecture. 2010
-
S. Krakowiak. Middleware Architecture with Patterns and Frameworks. 2009 (unfinished), http://lig-membres.imag.fr/krakowia/Files/MW-Book/Chapters/Preface/preface.html
-
J. Lewis, M. Fowler. Microservices - a definition of this new architectural term, 2014, http://martinfowler.com/articles/microservices.html
14. Appendix A - Start, Stop and List commands
Even if not required, Vert.x provides a convenient Launcher class. This class is just the entry point of the
application. It initializes Vert.x from the given parameter, deploys the given verticle and so on.
First, this launcher is extensible. For instance, in this lab, we have our own launcher extending the Vert.x one (io
.vertx.workshop.common.Launcher). In addition, this launcher is actually composed by an extensible set of command.
run is the default command and runs the given verticle. By default some other commands are available:
bare Creates a bare instance of vert.x.
list List vert.x applications
run Runs a verticle called <main-verticle> in its own instance of
vert.x.
start Start a vert.x application in background
stop Stop a vert.x application
test Runs a Vert.x Unit test called <test-verticle> in its own instance
of vert.x.
You can also add your own command.
In this section we are going to focus on the start, stop and list command.
When we have launched our microservices, they were blocking your shell (they were not launched in background). The
start command lets you start a Vert.x application in background. For the quote-generator, you can launch it as
follows:
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar start -id quote-generator --redirect-output
The command name follows the fat jar name. It gets an id parameter giving a name to your application. By default,
it generates a UUID. The --redirect-output flag instructs Vert.x to redirect the output to the terminal.
Now, to list the Vert.x application running, you launch:
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar list
Listing vert.x applications...
quote-generator target/quote-generator-1.0-SNAPSHOT-fat.jar
Finally to stop the application, just launch:
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop quote-generator
So, with this, you can easily write a few commands launching all the microservices:
# Skip this instruction to use your version
cd solution
cd quote-generator
mvn clean package
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar start -id quote-generator --redirect-output
cd ..
cd portfolio-service
mvn clean package
java -jar target/portfolio-service-1.0-SNAPSHOT-fat.jar start -id portfolio-service --redirect-output
cd ..
cd compulsive-traders
mvn clean package
java -jar target/compulsive-traders-1.0-SNAPSHOT-fat.jar start -id compulsive-traders --redirect-output
cd ..
cd audit-service
mvn clean package
java -jar target/audit-service-1.0-SNAPSHOT-fat.jar start -id audit-service --redirect-output
cd ..
cd trader-dashboard
mvn clean package
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar start -id trader-dashboard --redirect-output
cd ..
Once everything is launched, you can use the list command to check what’s running:
java -jar target/trader-dashboard-1.0-SNAPSHOT-fat.jar list
Listing vert.x applications...
quote-generator target/quote-generator-1.0-SNAPSHOT-fat.jar
portfolio-service target/portfolio-service-1.0-SNAPSHOT-fat.jar
compulsive-traders target/compulsive-traders-1.0-SNAPSHOT-fat.jar
audit-service target/audit-service-1.0-SNAPSHOT-fat.jar
trader-dashboard target/trader-dashboard-1.0-SNAPSHOT-fat.jar
you can use any of the fat jar to launch the list and stop commands.
|
We can now have the opposite script stopping everything:
cd quote-generator
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop trader-dashboard
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop quote-generator
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop audit-service
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop compulsive-traders
java -jar target/quote-generator-1.0-SNAPSHOT-fat.jar stop portfolio-service
The stop command send a signal to the process to ask for the termination. However, this may take time to happen
(the termination). So, the process are not stopped immediately. You can check this with the list command.