<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: ndr_brt</title>
    <description>The latest articles on DEV Community by ndr_brt (@ndrbrt_23).</description>
    <link>https://dev.to/ndrbrt_23</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F382000%2Fbf9b05c0-c608-4ac9-b533-a9fdc3e4c5e9.jpeg</url>
      <title>DEV Community: ndr_brt</title>
      <link>https://dev.to/ndrbrt_23</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ndrbrt_23"/>
    <language>en</language>
    <item>
      <title>Zero Downtime Deployment with Docker Swarm</title>
      <dc:creator>ndr_brt</dc:creator>
      <pubDate>Mon, 21 Dec 2020 12:54:27 +0000</pubDate>
      <link>https://dev.to/cherrychain/zero-downtime-deployment-with-docker-swarm-25n2</link>
      <guid>https://dev.to/cherrychain/zero-downtime-deployment-with-docker-swarm-25n2</guid>
      <description>&lt;p&gt;If you are a software developer that in the past has dealt with production software, you are certainly familiar with this struggle:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;deployment time&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;I hope you have already put into effect measures to &lt;strong&gt;trust your deployment&lt;/strong&gt; by taking advantage of great practices such as Continuous Integration, Automated Testing, Continuous Delivery and so on, these should be carried out before tackling the thing I’m going to talk about. This is &lt;strong&gt;not&lt;/strong&gt; going to be a silver bullet.&lt;/p&gt;

&lt;p&gt;Today we’ll talk about &lt;strong&gt;zero downtime&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  the problem
&lt;/h2&gt;

&lt;p&gt;Imagine you have already packaged your application into a docker image, published it into a docker registry and have it up and running in production by the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run …
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Nothing wrong, your application is good to go.&lt;/p&gt;

&lt;p&gt;But… how can we update it?&lt;/p&gt;

&lt;p&gt;The easiest solution is to stop the old one and start the new version&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker stop &amp;lt;running container &lt;span class="nb"&gt;id&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;

docker run &amp;lt;new version&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There’s a problem with this approach: from the time you stop the old container and the complete bootstrap of the new version, your application will not respond.&lt;/p&gt;

&lt;h2&gt;
  
  
  the solution
&lt;/h2&gt;

&lt;p&gt;This problem is pretty common and can be solved in many ways. We discovered an almost “effort free” way using &lt;strong&gt;Docker Swarm&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;what is Docker Swarm?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Swarm is a Docker “mode” already included in your Docker installation. It’s a powerful cluster engine that will help you scale your application.&lt;/p&gt;

&lt;p&gt;Also, it will solve your downtime problems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;how?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The idea is to transform your docker instance into a single node swarm cluster:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker swarm init
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This command should return something like&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;Swarm initialized: current node &lt;span class="o"&gt;(&lt;/span&gt;&amp;lt;node_id&amp;gt;&lt;span class="o"&gt;)&lt;/span&gt; is now a manager.

To add a worker to this swarm, run the following &lt;span class="nb"&gt;command&lt;/span&gt;:

docker swarm &lt;span class="nb"&gt;join&lt;/span&gt; — token &amp;lt;swarm_token&amp;gt; &amp;lt;node_addess+port&amp;gt;

To add a manager to this swarm, run ‘docker swarm join-token manager’ and follow the instructions.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;These are instructions for adding nodes to our cluster. But you don’t need that now.&lt;/p&gt;

&lt;p&gt;Now you need to deploy the &lt;strong&gt;stack&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;In order to achieve that, you need to define a &lt;strong&gt;docker-compose&lt;/strong&gt; file like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;‘3.7’&lt;/span&gt;

&lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;my-network&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;external&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="no"&gt;false&lt;/span&gt;

&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;my-server&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;${IMAGE}&lt;/span&gt;
    &lt;span class="na"&gt;hostname&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-server&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;my-server&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;‘8080:8080’&lt;/span&gt;
    &lt;span class="na"&gt;networks&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;my-network&lt;/span&gt;
    &lt;span class="na"&gt;healthcheck&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;test&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="nv"&gt;“CMD”&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;“curl”&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;“-i”&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;http://localhost:8080/health"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
    &lt;span class="na"&gt;deploy&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;mode&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;replicated&lt;/span&gt;
      &lt;span class="na"&gt;replicas&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;2&lt;/span&gt;
      &lt;span class="na"&gt;update_config&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;order&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;start-first&lt;/span&gt;
        &lt;span class="na"&gt;failure_action&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;rollback&lt;/span&gt;
        &lt;span class="na"&gt;delay&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;5s&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this example there is just one container, but you can deploy as many as you need.&lt;/p&gt;

&lt;p&gt;Some concepts:&lt;/p&gt;

&lt;p&gt;— &lt;strong&gt;image&lt;/strong&gt;: a variable that represents the image name and version&lt;/p&gt;

&lt;p&gt;— &lt;strong&gt;network&lt;/strong&gt;: the stack network, with this various services can communicate with each other&lt;/p&gt;

&lt;p&gt;— &lt;strong&gt;healthcheck&lt;/strong&gt;: definitions of commands that verify the service status (up or down). The commands should return &lt;code&gt;error code = 0&lt;/code&gt; when the service status is good and &lt;code&gt;error code != 0&lt;/code&gt; when the service has not started yet, stopped, paused, etc..&lt;/p&gt;

&lt;p&gt;— &lt;strong&gt;replicas&lt;/strong&gt;: the numbers of parallel containers of the service that will be deployed&lt;/p&gt;

&lt;p&gt;You can start your service by the following command:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;export &lt;/span&gt;&lt;span class="nv"&gt;IMAGE&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;IMAGE_NAME&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;:&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;IMAGE_VERSION&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;

docker stack deploy &lt;span class="nt"&gt;-c&lt;/span&gt; docker-compose.yml &amp;lt;stack_name&amp;gt; — with-registry-auth
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once you are ready to update your service, you can use the same command, with a different value of &lt;strong&gt;IMAGE_VERSION&lt;/strong&gt;, and there will be no downtime since Swarm will take care of starting the new containers and, once started correctly, it will also take care of stopping the old ones.&lt;/p&gt;

</description>
      <category>deploy</category>
      <category>docker</category>
      <category>swarm</category>
    </item>
    <item>
      <title>Throttle HTTP requests on paged resources with Vert.x</title>
      <dc:creator>ndr_brt</dc:creator>
      <pubDate>Mon, 21 Dec 2020 12:53:40 +0000</pubDate>
      <link>https://dev.to/cherrychain/throttle-http-requests-on-paged-resources-with-vert-x-5cjm</link>
      <guid>https://dev.to/cherrychain/throttle-http-requests-on-paged-resources-with-vert-x-5cjm</guid>
      <description>&lt;p&gt;Hey Java developers, I know that you are interested in asynchronous programming and I know you are struggling with such paradigm.&lt;br&gt;
I know that because I’m one of you!&lt;/p&gt;

&lt;p&gt;Let us see how to deal with rate limiting issues in the magical async world in Java with Vert.x.&lt;/p&gt;

&lt;p&gt;For the unaware a little recap:&lt;/p&gt;
&lt;h2&gt;
  
  
  What is Vert.x?
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://vertx.io/" rel="noopener noreferrer"&gt;Vert.x&lt;/a&gt; is a polyglot event-driven and non-blocking application framework.&lt;/p&gt;
&lt;h2&gt;
  
  
  What is “Rate limiting”?
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://en.wikipedia.org/wiki/Rate_limiting" rel="noopener noreferrer"&gt;Rate limiting&lt;/a&gt; is a set of policies implemented by a web server used to force client to “throttle” requests from being overloading.&lt;/p&gt;
&lt;h2&gt;
  
  
  What are paged resources?
&lt;/h2&gt;

&lt;p&gt;Paged resources are resources split in various pages to prevent overloading.&lt;/p&gt;

&lt;p&gt;We will work with a real implementation case:&lt;/p&gt;

&lt;p&gt;We are music fanatics (as I am), we are users of &lt;a href="https://www.discogs.com" rel="noopener noreferrer"&gt;Discogs.com&lt;/a&gt;, and we want to fetch its data in our &lt;strong&gt;non-blocking&lt;/strong&gt; application.&lt;/p&gt;

&lt;p&gt;Taking a look at the &lt;a href="https://www.discogs.com/developers" rel="noopener noreferrer"&gt;discogs.com API reference&lt;/a&gt; we will notice that — aside from the API specifications — there are some information about data format, headers, versioning and so on.&lt;br&gt;
We are now interested in two sections: &lt;strong&gt;Rate Limiting&lt;/strong&gt; and &lt;strong&gt;Pagination&lt;/strong&gt;.&lt;br&gt;
Those are the problems to deal with.&lt;/p&gt;

&lt;p&gt;A sketch of our architecture&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F7992%2F1%2AsAQQ_JJqYG8hHsNAYmygFA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F7992%2F1%2AsAQQ_JJqYG8hHsNAYmygFA.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Every arrow from left to right is a method call, every arrow from right to left is a Future.&lt;/p&gt;

&lt;p&gt;We will start from the bottom and go up the river.&lt;/p&gt;
&lt;h2&gt;
  
  
  Rate Limiting
&lt;/h2&gt;

&lt;p&gt;Discogs permits, according with documentation, 25 request per minute, so, 25 will be our rate limit, and one minute is the size of the rate limit window.&lt;/p&gt;

&lt;p&gt;The idea is to have an interface with a method called *execute *which will accept a Request (a data object that represents the request) as parameter and return a Future (because we think async!) of Buffer, that is the response’s body. &lt;br&gt;
Trivial and neat.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;Requests&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Buffer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;execute&lt;/span&gt;&lt;span class="o"&gt;**(&lt;/span&gt;&lt;span class="nc"&gt;Request&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We need to put a delay between requests, hence inside the &lt;em&gt;RequestExecutor&lt;/em&gt; implementation we will use a queue to decouple the &lt;em&gt;execute&lt;/em&gt; invocations from the throttled request execution:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ThrottledRequests&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Requests&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="o"&gt;...&lt;/span&gt;

  &lt;span class="nd"&gt;@Override&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Buffer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Request&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Buffer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
      &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;RequestContext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;        
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;failedFuture&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="o"&gt;...&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;em&gt;RequestContext&lt;/em&gt; is a simple data object that allow us to bind request to its future.&lt;/p&gt;

&lt;p&gt;Then, the request should be consumed from the queue and executed, we will do this with a periodic task by starting it every N milliseconds, where N is a non-fixed value computed from the maximum number of requests we are allowed to do in the rate limit window.&lt;/p&gt;

&lt;p&gt;At first we will start with an optimistic &lt;em&gt;1&lt;/em&gt;, therefore here’s our &lt;em&gt;ThrottledRequests&lt;/em&gt; constructor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ThrottledRequests&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Requests&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="no"&gt;RATE_LIMIT_WINDOW_SIZE&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;60000&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Logger&lt;/span&gt; &lt;span class="n"&gt;log&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;getLogger&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;getClass&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;BlockingQueue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;RequestContext&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;LinkedBlockingQueue&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;AtomicLong&lt;/span&gt; &lt;span class="n"&gt;executorId&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;AtomicLong&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;AtomicLong&lt;/span&gt; &lt;span class="n"&gt;actualDelay&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;AtomicLong&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;HttpClient&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Vertx&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nc"&gt;ThrottledRequests&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Vertx&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;vertx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;http&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createHttpClient&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setPeriodic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;actualDelay&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;executorId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="o"&gt;...&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Some explanations:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;em&gt;vertx&lt;/em&gt;: you need the vertx instance to launch a periodic executor.&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;actualDelay&lt;/em&gt;: is an AtomicLong that represent the actual delay, so, as we said before, it’s starting value is &lt;em&gt;1.&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;executor()&lt;/em&gt;: core method of the class, it insantiate a function that will perform the throttled requests.&lt;/li&gt;
&lt;li&gt;
&lt;em&gt;executorId&lt;/em&gt;: we need to track the executor task id to stop it when the actualDelay will change. Is an AtomicLong too&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let us see how to handle async request executions:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Handler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;executor&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;timerId&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;RequestContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;queue&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nc"&gt;Request&lt;/span&gt; &lt;span class="n"&gt;inputRequest&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;request&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
      &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;request&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;inputRequest&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;method&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;inputRequest&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;options&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;putHeader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"User-Agent"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"YourApp/0.1"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setFollowRedirects&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
          &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;bodyHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;complete&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
          &lt;span class="n"&gt;checkAndUpdateRateLimit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="o"&gt;})&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;end&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;};&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Easy, uh?&lt;br&gt;
On every request responded we complete the future and…checkAndUpdateRateLimit():&lt;br&gt;
according to the API documentation, there is a header in every response we will get from &lt;em&gt;discogs&lt;/em&gt; called &lt;strong&gt;X-Discogs-Ratelimit&lt;/strong&gt;, that tells us how many requests we can do in a rate limit window of 1 minute. Cool.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;checkAndUpdateRateLimit&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;HttpClientResponse&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofNullable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getHeader&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"X-Discogs-Ratelimit"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;Long:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;parseLong&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;rateLimit&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;rateLimit&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;reqPerMinute&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="no"&gt;RATE_LIMIT_WINDOW_SIZE&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt; &lt;span class="n"&gt;reqPerMinute&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ifPresent&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;throttleDelay&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;throttleDelay&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="n"&gt;actualDelay&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getAndSet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;throttleDelay&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cancelTimer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executorId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="kt"&gt;long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setPeriodic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;throttleDelay&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
        &lt;span class="n"&gt;executorId&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
      &lt;span class="o"&gt;}&lt;/span&gt;
    &lt;span class="o"&gt;});&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The calculation is pretty simple:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Check the rate limit header&lt;/li&gt;
&lt;li&gt;Subtract one, just to avoid to fill the window&lt;/li&gt;
&lt;li&gt;Calculate the delay you should take between requests to do less than rateLimit requests in the time window (one minute)&lt;/li&gt;
&lt;li&gt;If the delay is different from the current one, we remove the old executor’s timer and define another one with a new delay time.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Pagination
&lt;/h2&gt;

&lt;p&gt;Handling pagination is a little trickier than rate limit, since there is some work to do with futures, you know, the total page count will be known only after fetching the first page.&lt;/p&gt;

&lt;p&gt;The idea is to divide responsibilities, we will implement three components:&lt;br&gt;
(the example is on the Inventory resource, which retrieves Listing entities)&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;GetListingPage: one implementation for every resource, such class uses Requests as collaborator, knows the url to invoke and knows how to deserialize the single fetched page. So its responsability is the Page.&lt;/li&gt;
&lt;li&gt;Pages uses GetResourcePage to fetch first page and eventually the others, join them. Its responsibility is the Pages.&lt;/li&gt;
&lt;li&gt;Client obtains all the pages from Pages and extract the entities from those.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let’s start from GetListingPage, is a class that implement an interface with one method: apply&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;
&lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ListingPage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Integer&lt;/span&gt; &lt;span class="n"&gt;pageNumber&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Request {} inventory page {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pageNumber&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;path&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;format&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/users/%s/inventory?page=%d"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pageNumber&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="nc"&gt;Request&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Request&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ListingPage&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;execute&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;setHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
      &lt;span class="nc"&gt;ListingPage&lt;/span&gt; &lt;span class="n"&gt;page&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromJson&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;ListingPage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
      &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;complete&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;page&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fail&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cause&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;});&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Nothing special, execute request and deserialize JSON output to ListingPage class (a class that represent the ListingPage structure).&lt;/p&gt;

&lt;p&gt;Maybe the more interesting class is Pages. The public method is getFor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getFor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;.*&lt;/span&gt;&lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;*();&lt;/span&gt;

  &lt;span class="n"&gt;getPage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;setHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="no"&gt;T&lt;/span&gt; &lt;span class="n"&gt;firstPage&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
      &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;totalPages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;firstPage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;pages&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
      &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Total pages: {}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;totalPages&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;futures&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;IntStream&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;totalPages&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToObj&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;page&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;getPage&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;page&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

      &lt;span class="nc"&gt;CompositeFuture&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;all&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;futures&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;joinPages&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="n"&gt;firstPage&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;});&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We will ask for the first page and then, knowing the total pages count, we’ll ask for every other page, combining all the Futures into a CompositeFuture.&lt;/p&gt;

&lt;p&gt;When every future will be completed, we will joinPages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Handler&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;AsyncResult&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;CompositeFuture&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;joinPages&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;T&lt;/span&gt; &lt;span class="n"&gt;firstPage&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;async&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nc"&gt;CompositeFuture&lt;/span&gt; &lt;span class="n"&gt;remnants&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
      &lt;span class="nc"&gt;Collection&lt;/span&gt; &lt;span class="n"&gt;remnantPages&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;IntStream&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;remnants&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;size&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToObj&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;remnants:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;resultAt&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Page&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;cast&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

      &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="no"&gt;T&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;total&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
      &lt;span class="n"&gt;total&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;add&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;firstPage&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
      &lt;span class="n"&gt;total&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addAll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;remnantPages&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;complete&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;total&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fail&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cause&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;};&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, at the higher level (Client) we can just get pages and extract entities:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;listingPages&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;setHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Listing&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;listings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;async&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;ListingPage:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;listings&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flatMap&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;Collection:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.*&lt;/span&gt;&lt;span class="n"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;*()));&lt;/span&gt;

    &lt;span class="o"&gt;...&lt;/span&gt; &lt;span class="k"&gt;do&lt;/span&gt; &lt;span class="n"&gt;your&lt;/span&gt; &lt;span class="n"&gt;logic&lt;/span&gt; &lt;span class="o"&gt;...&lt;/span&gt;

  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Pretty cool, huh?&lt;/p&gt;

&lt;p&gt;Source code:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;em&gt;&lt;a href="https://gist.github.com/andreabertagnolli/f2cfacadc5c7bd00f24284c86f81dc79" rel="noopener noreferrer"&gt;ThrottledRequests&lt;/a&gt;&lt;/em&gt;: &lt;/li&gt;
&lt;li&gt;&lt;a href="https://gist.github.com/andreabertagnolli/8983d0cd1dace16e426b81c72430f14e" rel="noopener noreferrer"&gt;GetListingPage&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://gist.github.com/andreabertagnolli/a7a5d2e2e1980a1554e0b50e0594c622" rel="noopener noreferrer"&gt;Pages&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://gist.github.com/andreabertagnolli/a8398b8187cf02d94aef21b4c0e751cd" rel="noopener noreferrer"&gt;Client&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>vertx</category>
      <category>async</category>
      <category>java</category>
      <category>throttling</category>
    </item>
    <item>
      <title>TDD in an event driven application</title>
      <dc:creator>ndr_brt</dc:creator>
      <pubDate>Mon, 21 Dec 2020 12:52:53 +0000</pubDate>
      <link>https://dev.to/cherrychain/tdd-in-an-event-driven-application-2d6i</link>
      <guid>https://dev.to/cherrychain/tdd-in-an-event-driven-application-2d6i</guid>
      <description>&lt;p&gt;You (should) know, Test Driven Development is one of the greatest practice in software development: it gives you confidence in what you implement, it gives you courage to refactor or change behaviors, it forces you to think about what you are about to implement.&lt;/p&gt;

&lt;p&gt;For sure not every piece of code deserves to be written test-first, but at least you should start thinking: “Can I write a test before?”.&lt;/p&gt;

&lt;p&gt;Passing to an event driven world the process remains the same: &lt;br&gt;
there’s a &lt;a href="http://xunitpatterns.com/SUT.html" rel="noopener noreferrer"&gt;SUT&lt;/a&gt;, a trigger action, and some behavior verification:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F7824%2F1%2ALbGaYOzMRXDYwOcWErK1eQ.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F7824%2F1%2ALbGaYOzMRXDYwOcWErK1eQ.jpeg"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Consider implementing a feature for an hypothetical Continuous Integration application, on tests executed should check the results and notify to the application. This application in based on &lt;a href="https://vertx.io/" rel="noopener noreferrer"&gt;Vert.x framework&lt;/a&gt; and is coded in &lt;a href="https://kotlinlang.org/" rel="noopener noreferrer"&gt;Kotlin language&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;So our first test will be: “&lt;em&gt;on tests executed, when they passed, should emit tests passed”:&lt;/em&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight kotlin"&gt;&lt;code&gt;&lt;span class="nd"&gt;@ExtendWith&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;VertxExtension&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CheckTestsExecutionTest&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

  &lt;span class="nd"&gt;@Test&lt;/span&gt;
  &lt;span class="nd"&gt;@Timeout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timeUnit&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SECONDS&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;fun&lt;/span&gt; &lt;span class="nf"&gt;`on&lt;/span&gt; &lt;span class="n"&gt;tests&lt;/span&gt; &lt;span class="n"&gt;executed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;when&lt;/span&gt; &lt;span class="n"&gt;they&lt;/span&gt; &lt;span class="n"&gt;passed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;should&lt;/span&gt; &lt;span class="n"&gt;emit&lt;/span&gt; &lt;span class="n"&gt;tests&lt;/span&gt; &lt;span class="nf"&gt;passed`&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;Vertx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;VertxTestContext&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

    &lt;span class="nc"&gt;CheckTestsExecution&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;()).&lt;/span&gt;&lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsPassed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;completeNow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
      &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsExecuted"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mapFrom&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="nc"&gt;TestsExecuted&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;listOf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
          &lt;span class="nc"&gt;TestResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"simple test"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;passed&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;)))&lt;/span&gt;
      &lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There’s a lot of stuff in there. The &lt;em&gt;VertxExtension *class&lt;/em&gt; &lt;em&gt;is a junit5 extension provided by the library *vertx-junit5&lt;/em&gt; that gives some facilities to write Vert.x unit tests.&lt;/p&gt;

&lt;p&gt;But, the very important thing here is that there’s &lt;strong&gt;no assertion&lt;/strong&gt;. The reason is simple: we are testing an &lt;strong&gt;asynchronous behavior&lt;/strong&gt;, so we will get no synchronous output from our .&lt;/p&gt;

&lt;p&gt;What we need here is to set an &lt;strong&gt;expectation&lt;/strong&gt;. To do that we create a consumer on the event we expecting to be launched from our SUT. To make test pass then we call the &lt;em&gt;completeNow&lt;/em&gt; method on &lt;em&gt;context&lt;/em&gt; object (is a Vert.x junit5 extension utility to handle asynchronous tests, as well as &lt;em&gt;@Timeout&lt;/em&gt; annotation).&lt;br&gt;
The test will pass only when &lt;em&gt;completeNow&lt;/em&gt; method is called before the timeout. Simple.&lt;/p&gt;

&lt;p&gt;The last test row emits an event that should trigger our desired behavior.&lt;br&gt;
Obviously, after fixing compilation errors, the test will &lt;strong&gt;fail&lt;/strong&gt;. Red bar, cool.&lt;/p&gt;

&lt;p&gt;Let’s write &lt;strong&gt;only enough code&lt;/strong&gt; to make test pass:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight kotlin"&gt;&lt;code&gt;&lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CheckTestsExecution&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;val&lt;/span&gt; &lt;span class="py"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;EventBus&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;fun&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsExecuted"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsPassed"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt; 
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Green bar. Go on.&lt;/p&gt;

&lt;p&gt;Now, we need to describe the case where one test fails and so we will expect to receive &lt;em&gt;TestsFailed&lt;/em&gt; event. We should also expect not to receive &lt;em&gt;TestsPasses&lt;/em&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight kotlin"&gt;&lt;code&gt;  &lt;span class="nd"&gt;@Test&lt;/span&gt;
  &lt;span class="nd"&gt;@Timeout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timeUnit&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;SECONDS&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
  &lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;fun&lt;/span&gt; &lt;span class="nf"&gt;`on&lt;/span&gt; &lt;span class="n"&gt;tests&lt;/span&gt; &lt;span class="n"&gt;executed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;when&lt;/span&gt; &lt;span class="n"&gt;they&lt;/span&gt; &lt;span class="n"&gt;failed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;should&lt;/span&gt; &lt;span class="n"&gt;emit&lt;/span&gt; &lt;span class="n"&gt;tests&lt;/span&gt; &lt;span class="nf"&gt;passed`&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;Vertx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;VertxTestContext&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;CheckTestsExecution&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;()).&lt;/span&gt;&lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsPassed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;failNow&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RuntimeException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"No TestPassed event should be emitted"&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsFailed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
      &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;completeNow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;val&lt;/span&gt; &lt;span class="py"&gt;testResults&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;listOf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
      &lt;span class="nc"&gt;TestResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"simple test"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;passed&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
      &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsExecuted"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;mapFrom&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TestsExecuted&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;testResults&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Red bar again, so, make it green ASAP!&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight kotlin"&gt;&lt;code&gt;&lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;CheckTestsExecution&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;val&lt;/span&gt; &lt;span class="py"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nc"&gt;EventBus&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;fun&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsExecuted"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;-&amp;gt;&lt;/span&gt;
      &lt;span class="kd"&gt;val&lt;/span&gt; &lt;span class="py"&gt;testExecuted&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;body&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;mapTo&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TestsExecuted&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;java&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
      &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;testExecuted&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;none&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="p"&gt;!&lt;/span&gt;&lt;span class="n"&gt;it&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;passed&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsPassed"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;eventBus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"TestsFailed"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
      &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
  &lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Perfect, you just implemented an event driven component with TDD on Vert.x in Kotlin. Refactoring is up to you.&lt;/p&gt;

&lt;p&gt;As you see, no great difference from the classic TDD approach: think, write a test, do something and expect behaviors, just pay attention to asynchronous manners and use expectations.&lt;/p&gt;

</description>
      <category>eventdriven</category>
      <category>async</category>
      <category>tdd</category>
      <category>kotlin</category>
    </item>
    <item>
      <title>Handle backpressure between Kafka and a database with Vert.x

</title>
      <dc:creator>ndr_brt</dc:creator>
      <pubDate>Thu, 06 Aug 2020 06:52:39 +0000</pubDate>
      <link>https://dev.to/cherrychain/handle-backpressure-between-kafka-and-a-database-with-vert-x-5a3j</link>
      <guid>https://dev.to/cherrychain/handle-backpressure-between-kafka-and-a-database-with-vert-x-5a3j</guid>
      <description>&lt;p&gt;As we &lt;a href="https://dev.to/cherrychain/introduction-to-vert-x-37nb"&gt;already discussed in the past&lt;/a&gt;, asynchronous programming brings many pros in developing reactive and responsive applications. However, it also carries cons and challenges, one of the main ones is the backpressure problem.&lt;/p&gt;

&lt;h4&gt;
  
  
  What is backpressure?
&lt;/h4&gt;

&lt;p&gt;In physics&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;it is a resistance or force opposing the desired flow of fluid through pipes (&lt;a href="https://en.wikipedia.org/wiki/Back_pressure" rel="noopener noreferrer"&gt;wikipedia&lt;/a&gt;)&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;We can translate the problem on a known scenario: persistence of messages polled from a bus, where there are a huge amount of messages on a bus that our application is polling really fast, but the database underneath is really slow.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F7cdnoazuit8m9ab2za9e.jpg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F7cdnoazuit8m9ab2za9e.jpg" alt="real world example"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;em&gt;How can be funnel overflow be avoided?&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In a synchronous scenario, there's no backpressure's issue, the sync nature of the computation blocks the polling from the bus until the current message is processed.&lt;br&gt;
But, in the async world, the polling is executed without clues on what's happening on the database. So if the database can't handle all the messages coming from the bus, the messages will remain "in between", which means in the memory of our service. &lt;br&gt;
This can lead to failures or, at worst, to service fault.&lt;/p&gt;

&lt;p&gt;Let's try to develop an application that persists messages in a database, and make it evolve to handle backpressure&lt;/p&gt;

&lt;h4&gt;
  
  
  Automatic polling
&lt;/h4&gt;

&lt;p&gt;At first, our verticle will do those operations: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;initialize the JDBC client&lt;/li&gt;
&lt;li&gt;initialize the Kafka client&lt;/li&gt;
&lt;li&gt;subscribe to the topic&lt;/li&gt;
&lt;li&gt;persist the records&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The code is quite simple, and it works well with small amounts of messages. &lt;br&gt;
When the load gets bigger and bigger a problem appears: using the handler of Vertx Kafka consumer means that there's no control on the message ratio, so it will poll continuously without considering the persistance rate, causing memory overload.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MainVerticle&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;AbstractVerticle&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;&lt;br&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;JDBCClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datasourceConfiguration&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topic.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
        &lt;span class="n"&gt;persist&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;br&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message persisted"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;&lt;br&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cause&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;err&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message not persisted "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;cause&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;});&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;HashMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafkahost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;UpdateResult&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;persist&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;UpdateResult&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;JsonArray&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;toParams&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;updateWithParams&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"insert or update query to persist record"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JsonObject&lt;/span&gt; &lt;span class="nf"&gt;datasourceConfiguration&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="c1"&gt;// TODO datasource configuration&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JsonArray&lt;/span&gt; &lt;span class="nf"&gt;toParams&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;KafkaConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="c1"&gt;// TODO: convert the record into params for the sql command&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;br&gt;
&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Explicit polling&lt;br&gt;
&lt;/h4&gt;

&lt;p&gt;To handle the backpressure, explicit polling shall be used, and this can be done by avoiding the kafka consumer's handler setting and by calling &lt;code&gt;poll&lt;/code&gt; manually (in the following case, every 100ms).&lt;br&gt;
By using this approach, it can be made so that every poll gets performed only when the batch of previously polled messages are persisted.&lt;br&gt;
This behaviour can be achieved by handling every message's &lt;code&gt;persist&lt;/code&gt; future and collecting all of them with the &lt;code&gt;CompositeFuture.all&lt;/code&gt;, that will succeed only when all the messages are completed, and only in this case the next polling can be made.&lt;br&gt;
If at least one of the future fails, everything will fail, and the polling will stop.&lt;br&gt;
There are various solutions that can be adopted to make the service handle the failure, e.g. sending the message to a &lt;a href="https://en.wikipedia.org/wiki/Dead_letter_queue" rel="noopener noreferrer"&gt;Dead Letter Queue&lt;/a&gt;, but we will not cover this case.&lt;/p&gt;

&lt;p&gt;The problem with this code is that if a message fails, we will lose it, because the consumer is set to auto-commit, so, it's vertx that commits the topic offset.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MainVerticle&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;AbstractVerticle&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;&lt;br&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;JDBCClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datasourceConfiguration&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topic.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="n"&amp;gt;poll&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;KafkaConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="n"&amp;gt;pollPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;()&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;List&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;Future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;UpdateResult&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;gt;&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;futures&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;=&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;IntStream&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;range&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="mi"&amp;gt;0&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;size&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;())&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;mapToObj&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="nl"&amp;gt;records:&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;:&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;recordAt&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;map&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;persist&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;))&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;collect&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;toList&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;());&amp;lt;/span&amp;gt;

    &amp;lt;span class="k"&amp;gt;return&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;CompositeFuture&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;all&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="k"&amp;gt;new&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;ArrayList&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;futures&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;));&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onSuccess&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;composite&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;out&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"All messages persisted"&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
    &amp;lt;span class="n"&amp;gt;poll&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onFailure&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;err&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"Error persisting messages: "&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;+&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;))&amp;lt;/span&amp;gt;
&amp;lt;span class="o"&amp;gt;;&amp;lt;/span&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;HashMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafkahost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="o"&gt;...&lt;/span&gt;&lt;br&gt;
&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Manual commit&lt;br&gt;
&lt;/h4&gt;

&lt;p&gt;Setting the &lt;code&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/code&gt; properties to &lt;code&gt;false&lt;/code&gt;, the service takes ownership of the topic offset commit.&lt;br&gt;
The commit will be performed only when every message will be persisted, with this trick the &lt;code&gt;at least once&lt;/code&gt; delivery is achieved.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MainVerticle&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;AbstractVerticle&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="nd"&gt;@Override&lt;/span&gt;&lt;br&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;start&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;JDBCClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;datasourceConfiguration&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;&lt;span class="n"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;vertx&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;&lt;br&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"topic.name"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;startPromise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="n"&amp;gt;poll&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;KafkaConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="n"&amp;gt;pollPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;()&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;List&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;Future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;UpdateResult&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;gt;&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;futures&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;=&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;IntStream&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;range&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="mi"&amp;gt;0&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;size&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;())&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;mapToObj&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="nl"&amp;gt;records:&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;:&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;recordAt&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;map&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;persist&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;))&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;collect&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;toList&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;());&amp;lt;/span&amp;gt;

    &amp;lt;span class="k"&amp;gt;return&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;CompositeFuture&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;all&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="k"&amp;gt;new&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;ArrayList&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;futures&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;));&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;composite&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;Promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;Void&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;=&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;Promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;();&amp;lt;/span&amp;gt;
    &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;commit&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
    &amp;lt;span class="k"&amp;gt;return&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;();&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onSuccess&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;any&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;out&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"All messages persisted and committed"&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
    &amp;lt;span class="n"&amp;gt;poll&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onFailure&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;err&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"Error persisting and committing messages: "&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;+&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;))&amp;lt;/span&amp;gt;
&amp;lt;span class="o"&amp;gt;;&amp;lt;/span&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;kafkaConsumerConfiguration&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;HashMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;BOOTSTRAP_SERVERS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafkahost:9092"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;ENABLE_AUTO_COMMIT_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"false"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;KEY_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;put&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;VALUE_DESERIALIZER_CLASS_CONFIG&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;StringDeserializer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getName&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;...&lt;/span&gt;&lt;br&gt;
&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Bonus feature: achieve ordering&lt;br&gt;
&lt;/h4&gt;

&lt;p&gt;With a little effort it's possible to achieve ordering:&lt;br&gt;
&lt;a href="https://dev.to/cherrychain/future-composition-in-vert-x-3gp8"&gt;future composition&lt;/a&gt; allows forcing every persist operation to wait the completion of its precedent.&lt;br&gt;
It's achievable by chaining the async computations one to another, so every one will be executed when the precedent future succeeds.&lt;br&gt;
This is a smart pattern to be used when &lt;code&gt;serialization&lt;/code&gt; is needed.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MainVerticle&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;AbstractVerticle&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;...&lt;/span&gt;&lt;br&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;JDBCClient&lt;/span&gt; &lt;span class="n"&gt;jdbc&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;KafkaConsumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;KafkaConsumerRecords&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;&lt;br&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;poll&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;pollPromise&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;span class="n"&amp;gt;pollPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;()&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;IntStream&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;range&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="mi"&amp;gt;0&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;records&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;size&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;())&amp;lt;/span&amp;gt;
    &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;mapToObj&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="nl"&amp;gt;records:&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;:&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;recordAt&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt;
    &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;reduce&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;Future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;UpdateResult&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;gt;&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;succeededFuture&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(),&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;acc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;acc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;it&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;persist&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;record&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)),&amp;lt;/span&amp;gt;
      &amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;a&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;b&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;a&amp;lt;/span&amp;gt;
    &amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;)&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;compose&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;composite&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;Promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;lt;&amp;lt;/span&amp;gt;&amp;lt;span class="nc"&amp;gt;Void&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;=&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;Promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;promise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;();&amp;lt;/span&amp;gt;
    &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;commit&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
    &amp;lt;span class="k"&amp;gt;return&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;commitPromise&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;future&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;();&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onSuccess&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;any&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;{&amp;lt;/span&amp;gt;
    &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;out&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"All messages persisted and committed"&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
    &amp;lt;span class="n"&amp;gt;poll&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;jdbc&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;,&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;consumer&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;);&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;})&amp;lt;/span&amp;gt;
  &amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;onFailure&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;-&amp;amp;gt;&amp;lt;/span&amp;gt; &amp;lt;span class="nc"&amp;gt;System&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;err&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;.&amp;lt;/span&amp;gt;&amp;lt;span class="na"&amp;gt;println&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;(&amp;lt;/span&amp;gt;&amp;lt;span class="s"&amp;gt;"Error persisting and committing messages: "&amp;lt;/span&amp;gt; &amp;lt;span class="o"&amp;gt;+&amp;lt;/span&amp;gt; &amp;lt;span class="n"&amp;gt;cause&amp;lt;/span&amp;gt;&amp;lt;span class="o"&amp;gt;));&amp;lt;/span&amp;gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;span class="o"&gt;}&lt;/span&gt;&lt;br&gt;
  &lt;span class="o"&gt;...&lt;/span&gt;&lt;br&gt;
&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h4&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Conclusion&lt;br&gt;
&lt;/h4&gt;

&lt;p&gt;Backpressure is a fundamental topic to cover when working with async programming.&lt;br&gt;
It does not come for free out of the vert.x box, but it can be acheived with some simple tricks.&lt;/p&gt;

</description>
      <category>vertx</category>
      <category>backpressure</category>
      <category>java</category>
      <category>async</category>
    </item>
    <item>
      <title>Future Composition in Vert.x</title>
      <dc:creator>ndr_brt</dc:creator>
      <pubDate>Wed, 20 May 2020 11:42:45 +0000</pubDate>
      <link>https://dev.to/cherrychain/future-composition-in-vert-x-3gp8</link>
      <guid>https://dev.to/cherrychain/future-composition-in-vert-x-3gp8</guid>
      <description>&lt;p&gt;For those who don't know, Vert.x is an event driven and non blocking application toolkit.&lt;br&gt;
It's polyglot, so you can use it with different languages (as &lt;strong&gt;Java&lt;/strong&gt;, &lt;strong&gt;Kotlin&lt;/strong&gt;, &lt;strong&gt;JavaScript&lt;/strong&gt;, &lt;strong&gt;Groovy&lt;/strong&gt;, &lt;strong&gt;Ruby&lt;/strong&gt; or &lt;strong&gt;Scala&lt;/strong&gt;).&lt;/p&gt;
&lt;h3&gt;
  
  
  What does "non blocking" mean?
&lt;/h3&gt;

&lt;p&gt;In synchronous programming, when a function is called, the caller has to wait until the result is returned. &lt;br&gt;
This behaviour could lead to performance issues.&lt;/p&gt;

&lt;p&gt;Often the "obvious solution" seems to be &lt;a href="https://en.wikipedia.org/wiki/Concurrent_computing"&gt;concurrent programming&lt;/a&gt;, but dealing with shared resources and threads is not easy and deadlocks are around the corner.&lt;/p&gt;

&lt;p&gt;Explaining how Vert.x guarantees asynchrony through the &lt;strong&gt;event loop&lt;/strong&gt; concept is not in the scope of this article, but anything you may want to know gets unraveled in the great &lt;a href="https://vertx.io/docs/guide-for-java-devs/"&gt;Gentle guide to async programming with Vert.x&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In a "non blocking" world, when the result of a function can be provided immediately, it will be. Otherwise a handler is provided to handle it when it will be ready.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;asyncFunctionCall&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;doSometingWith&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Usualy a big "but" gets raised the first time an async piece of code is seen: &lt;em&gt;But... I need that result now!&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;This kind of programming requires a mindset change: it's necessary to know and understand the main patterns.&lt;/p&gt;

&lt;h3&gt;
  
  
  Futures vs Callbacks
&lt;/h3&gt;

&lt;p&gt;There are two ways to deal with async calls in Vert.x.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pass a callback that will be executed on call completion.&lt;/li&gt;
&lt;li&gt;Handle a future returned from the function/method call.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Callback
&lt;/h4&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;asyncResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="c1"&gt;// do stuff with result&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// handle failure&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;A callback is a function passed to an async method used to handle the result of its computation.&lt;br&gt;
It's simple to implement but it brings some drawbacks:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Unit testing becomes not-so-immediate&lt;/li&gt;
&lt;li&gt;Leads to the dreaded &lt;a href="http://callbackhell.com/"&gt;Callback Hell&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;There's more code to be written.&lt;/li&gt;
&lt;/ul&gt;
&lt;h4&gt;
  
  
  Future
&lt;/h4&gt;


&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Future&lt;/span&gt; &lt;span class="n"&gt;future&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// do stuff&lt;/span&gt;
&lt;span class="o"&gt;})&lt;/span&gt;

&lt;span class="n"&gt;future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cause&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// handle failure&lt;/span&gt;
&lt;span class="o"&gt;})&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;


&lt;p&gt;To avoid the problems listed for the "callback way", Vert.x implements a concept called &lt;strong&gt;Future&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;A future is an object that &lt;strong&gt;represents the result of an action that may, or may not, have occurred yet&lt;/strong&gt; (cf. &lt;a href="https://vertx.io/docs/apidocs/io/vertx/core/Future.html"&gt;apidoc&lt;/a&gt;).&lt;/p&gt;
&lt;h5&gt;
  
  
  How to switch from a callback-like call to a future one
&lt;/h5&gt;

&lt;p&gt;Consider the callback example shown above. &lt;br&gt;
We want a Future object to take advantage of the patterns described below, but &lt;code&gt;asyncComputation&lt;/code&gt; is a function from another library, so we cannot modify it.&lt;/p&gt;

&lt;p&gt;We can use a &lt;strong&gt;Promise&lt;/strong&gt;.&lt;br&gt;
According to the &lt;a href="https://vertx.io/docs/apidocs/io/vertx/core/Promise.html"&gt;apidoc&lt;/a&gt;, it &lt;strong&gt;represents the writable side of an action that may, or may not, have occurred yet.&lt;/strong&gt;&lt;br&gt;
It perfectly fits our needs:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Promise&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeeded&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;complete&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;result&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fail&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;asyncResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;cause&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;})&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;That's it. We transformed a callback into a future.&lt;br&gt;
The Promise API's give us a way to make this code more readable:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Promise&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;promise:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;handle&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;promise&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;future&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The &lt;em&gt;handle&lt;/em&gt; method takes care of the completion or failure of the promise, given an async result.&lt;/p&gt;

&lt;h3&gt;
  
  
  Future patterns
&lt;/h3&gt;

&lt;p&gt;The Future object implements some interesting patterns that smartly help resolving async issues:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Map&lt;/li&gt;
&lt;li&gt;Compose&lt;/li&gt;
&lt;li&gt;Recover&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Future Mapping
&lt;/h4&gt;

&lt;p&gt;For those of you who know about the &lt;code&gt;map&lt;/code&gt; function, part of the java &lt;code&gt;Stream&lt;/code&gt; API, this feature should be immediate to understand.&lt;/p&gt;

&lt;p&gt;The Future's map function accepts a function that transforms the result from one type to another.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="c1"&gt;// returns a Future&amp;lt;String&amp;gt;&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;Integer:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;// returns a Future&amp;lt;Integer&amp;gt;&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h4&gt;
  
  
  Future Composition
&lt;/h4&gt;

&lt;p&gt;The &lt;code&gt;compose&lt;/code&gt; method is similar to &lt;code&gt;map&lt;/code&gt;, but is used when the mapping is an async operation itself:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;Integer:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;compose&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;retrieveDataById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="c1"&gt;// retrieveDataById returns a Future&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h4&gt;
  
  
  Future Recovery
&lt;/h4&gt;

&lt;p&gt;Futures can succeed, but can also fail. &lt;br&gt;
To handle a failed future and consider a different behaviour, &lt;code&gt;recover&lt;/code&gt; function can be used:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="n"&gt;asyncComputation&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;Integer:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;recover&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cause&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Future&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;succeededFuture&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="c1"&gt;// when Integer::valueOf fails, the future could be recovered with a default value&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;compose&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;retrieveDataById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(...)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h4&gt;
  
  
  Concurrent composition
&lt;/h4&gt;

&lt;p&gt;To handle multiple future results at the same time, &lt;code&gt;CompositeFuture&lt;/code&gt; is the class needed, it provides two static factory methods:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;all&lt;/code&gt; returns a future that succeeds if all the futures passed as parameters succeed, and fails if at least one fails.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;CompositeFuture&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;all&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;futureOne&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;futureTwo&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;compositeResult&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
    &lt;span class="c1"&gt;// all futures succeeded&lt;/span&gt;
  &lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cause&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; 
    &lt;span class="c1"&gt;// at least one failed&lt;/span&gt;
  &lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;&lt;code&gt;any&lt;/code&gt; returns a future that succeeds if any one of the futures passed as parameters succeed, and fails if all fail.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;CompositeFuture&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;any&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;futureOne&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;futureTwo&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onSuccess&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;compositeResult&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
    &lt;span class="c1"&gt;// at least one succeed&lt;/span&gt;
  &lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onFailure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cause&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; 
    &lt;span class="c1"&gt;// all failed&lt;/span&gt;
  &lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h3&gt;
  
  
  Conclusions
&lt;/h3&gt;

&lt;p&gt;Future composition API in Vert.x represents a solid way to write simple and affordable async code.&lt;br&gt;
Always remember:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;never throw exceptions into async code, use failed future instead to handle failure behaviours.&lt;/li&gt;
&lt;li&gt;at the end of a future composition, don't forget to handle future's successes (&lt;code&gt;onSuccess&lt;/code&gt;) and failures (&lt;code&gt;onFailure&lt;/code&gt;)&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>vertx</category>
      <category>async</category>
      <category>java</category>
    </item>
  </channel>
</rss>
