<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Lovegiver</title>
    <description>The latest articles on DEV Community by Lovegiver (@lovegiver).</description>
    <link>https://dev.to/lovegiver</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F835080%2F00255e59-1f24-4264-90b0-93080256f893.jpeg</url>
      <title>DEV Community: Lovegiver</title>
      <link>https://dev.to/lovegiver</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/lovegiver"/>
    <language>en</language>
    <item>
      <title>Reactive Pipeline : the App (part 2)</title>
      <dc:creator>Lovegiver</dc:creator>
      <pubDate>Sat, 23 Jul 2022 18:35:00 +0000</pubDate>
      <link>https://dev.to/lovegiver/-reactive-pipeline-the-app-part-2-32ef</link>
      <guid>https://dev.to/lovegiver/-reactive-pipeline-the-app-part-2-32ef</guid>
      <description>&lt;p&gt;The "&lt;em&gt;ReactivePipeline&lt;/em&gt;" project is a simple interface, a single class project. You can find link to the Github repository in &lt;a href="https://dev.to/lovegiver/reactive-pipeline-a-starter-part-1-578n"&gt;Part. #1&lt;/a&gt; of this post.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;ReactiveContext&lt;/code&gt; class just contains the &lt;em&gt;static methods&lt;/em&gt; we need to instantiate the necessary objects for creating a persistent &lt;code&gt;Flux&lt;/code&gt; &lt;em&gt;pipeline&lt;/em&gt; for your whole application.&lt;br&gt;
We will see each of them in the first part of this document.&lt;/p&gt;

&lt;p&gt;Nevertheless, these methods / objects are not the only ones that are required to build a reactive app.&lt;br&gt;
So the second part of this README will present you all the objects needed to make this API complete and working.&lt;/p&gt;
&lt;h2&gt;
  
  
  How things may happen, how it will work
&lt;/h2&gt;

&lt;p&gt;To start, just think about the fact that among all the operations of a particular process, we can distinguish :&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Starting operations&lt;/strong&gt; which do not take any arguments, thus have no &lt;em&gt;predecessors&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Ending operations&lt;/strong&gt; which take arguments and have no &lt;em&gt;successors&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Intermediate operations&lt;/strong&gt; are the others : they need arguments from their predecessors and produce outputs that will be their successors' inputs&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Most of the time, we use to design and build apps containing methods which are triggered sequentially in a very procedural way. This can be represented by a straight line of processing operations : A --&amp;gt; B --&amp;gt; C --&amp;gt; ...&lt;/p&gt;

&lt;p&gt;But we can also imagine operations as a &lt;strong&gt;tree&lt;/strong&gt; in which methods A and B are independent, so parallelized, and both producing a result which C deserves. In such a case, the C function will take result_A and result_B as arguments and we'll have to synchronize both operations in order to pass their respective results to C : C(result_A, result_B)&lt;br&gt;
Or, at the opposite, a A function producing a result_A which will be consumed in a parallelized manner by B and C as soon as it will be available : B(result_A) // C(result_A)&lt;/p&gt;

&lt;p&gt;To face all of these situations, we need a flexible data-structure where data - thus functions producing these data - will be organized smartly. Wrappers has been used for this to be possible.&lt;/p&gt;
&lt;h2&gt;
  
  
  The toolbox
&lt;/h2&gt;

&lt;p&gt;Many objects we'll talk about are wrappers. It is important to understand how they interact with each others.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;code&gt;Operation&lt;/code&gt; is &lt;em&gt;the corner stone of our model&lt;/em&gt;. It is a &lt;code&gt;Functional Interface&lt;/code&gt;. Each action, each method, each function, has to be an &lt;code&gt;Operation&lt;/code&gt;. An &lt;code&gt;Operation&lt;/code&gt; takes a &lt;em&gt;varargs&lt;/em&gt; of &lt;code&gt;Flux&lt;/code&gt;(es) as arguments and produces a &lt;code&gt;Flux&lt;/code&gt;.
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;Task&lt;/code&gt; wraps a single &lt;code&gt;Operation&lt;/code&gt;. It is a class with some useful properties and methods. It triggers &lt;code&gt;Operation&lt;/code&gt; execution and inject the produced &lt;code&gt;Flux&lt;/code&gt; into the next &lt;code&gt;Operation&lt;/code&gt; to maintain the reactive behavior.
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;WorkGroup&lt;/code&gt; is a wrapper for a set of &lt;code&gt;Task&lt;/code&gt;s, but you won't use it directly. You will use it only if you decide to create your own &lt;code&gt;Optimizer&lt;/code&gt;. If you rely on the default &lt;code&gt;Optimizer&lt;/code&gt;, the &lt;code&gt;Pipeline&lt;/code&gt; will create &lt;code&gt;WorkGroup&lt;/code&gt;s for you. All you have to  understand about a &lt;code&gt;WorkGroup&lt;/code&gt; is that it groups all &lt;code&gt;Task&lt;/code&gt;s involved into the realization of a common final &lt;code&gt;Operation&lt;/code&gt;.
&lt;/li&gt;
&lt;li&gt;Finally, the &lt;code&gt;Pipeline&lt;/code&gt; is a wrapper for a set of &lt;code&gt;Task&lt;/code&gt;s (and also for one or more &lt;code&gt;WorkGroup&lt;/code&gt;s as it will dispatch all the tasks in different work-groups).&lt;/li&gt;
&lt;li&gt;A last object to talk about is the &lt;code&gt;DataStreamer&lt;/code&gt;. It's aside from preceding objects as it is not a wrapper but the mechanism used to export the state of all &lt;code&gt;Monitorable&lt;/code&gt; classes : &lt;code&gt;Task&lt;/code&gt;, &lt;code&gt;WorkGroup&lt;/code&gt; and &lt;code&gt;Pipeline&lt;/code&gt; all have a &lt;code&gt;Monitor&lt;/code&gt; property that describes their current state (new, running, done, in error). The &lt;code&gt;DataStreamer&lt;/code&gt; produces a &lt;code&gt;Flux&lt;/code&gt; containing all the states of all objects within the &lt;code&gt;Pipeline&lt;/code&gt;. Each time one's state changes, a tick is triggered by a &lt;code&gt;Notifier&lt;/code&gt; to the &lt;code&gt;DataStreamer&lt;/code&gt;, which in turn triggers a new &lt;code&gt;Flux&lt;/code&gt; that can, for example, be displayed on a web page for monitoring purpose. But it can be whatever you need. This is just the way I've chosen to talk about what is known under the &lt;strong&gt;&lt;em&gt;hot stream&lt;/em&gt;&lt;/strong&gt; name.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;The global philosophy is :&lt;/strong&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;We create all necessary &lt;code&gt;Operation&lt;/code&gt;s. Try to think this object as a pure function, doing just one thing.&lt;/li&gt;
&lt;li&gt;Each &lt;code&gt;Operation&lt;/code&gt; is wrapped into a &lt;code&gt;Task&lt;/code&gt; object. To be instantiated, a &lt;code&gt;Task&lt;/code&gt; must have a single &lt;code&gt;Operation&lt;/code&gt; and a Set of all the &lt;em&gt;previous&lt;/em&gt; &lt;code&gt;Task&lt;/code&gt;s whom produced &lt;code&gt;Flux&lt;/code&gt;es are arguments for this &lt;code&gt;Task&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;All the &lt;code&gt;Task&lt;/code&gt;s will finally be used as arguments for a &lt;code&gt;Pipeline&lt;/code&gt;. The &lt;code&gt;Pipeline&lt;/code&gt;, thanks to its &lt;code&gt;Optimizer&lt;/code&gt;, create one or more &lt;code&gt;WorkGroup&lt;/code&gt;s. Once this is made, all &lt;code&gt;WorkGroup&lt;/code&gt;s will be executed in &lt;em&gt;parallel threads&lt;/em&gt; and in an &lt;em&gt;asynchronous&lt;/em&gt; manner.&lt;/li&gt;
&lt;/ol&gt;
&lt;h2&gt;
  
  
  Objects from the ReactiveContext
&lt;/h2&gt;
&lt;h3&gt;
  
  
  The Pipeline
&lt;/h3&gt;

&lt;p&gt;The Pipeline class is a wrapper for a set of Tasks. When calling its &lt;code&gt;.execute()&lt;/code&gt; method, then all &lt;code&gt;Operation&lt;/code&gt;s will be executed from the very first starting ones to the ending ones.&lt;/p&gt;

&lt;p&gt;You can obtain a &lt;code&gt;Pipeline&lt;/code&gt; using :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Pipeline&lt;/span&gt; &lt;span class="nf"&gt;createPipeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Set&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;allTasks&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;

    &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Pipeline&lt;/span&gt; &lt;span class="nf"&gt;createPipeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;pipelineName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Set&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;allTasks&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;WorkGroupOptimizer&lt;/span&gt; &lt;span class="n"&gt;optimizer&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the second method, you'll have to define your own &lt;code&gt;Optimizer&lt;/code&gt;. This means that you define you own logic to group Tasks into &lt;code&gt;WorkGroup&lt;/code&gt;s. In order to define your own &lt;code&gt;Optimizer&lt;/code&gt;, you'll have to implement the following &lt;code&gt;Functional Interface&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Collection&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;WorkGroup&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;optimize&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Set&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;allTasks&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The existing default &lt;code&gt;Optimizer&lt;/code&gt;'s logic is very basic and may be hugely improved and optimized (it is part of my To-Do list).&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;First it looks for all ending (final, terminal) Tasks&lt;/li&gt;
&lt;li&gt;Then, it groups into a same WorkGroup all the Task involved into the realization of the same final Task&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  The Task
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;Task&lt;/code&gt; is the &lt;code&gt;Operation&lt;/code&gt;'s wrapper. The &lt;code&gt;Operation&lt;/code&gt; defines the domain's &lt;em&gt;logic&lt;/em&gt; whereas the &lt;code&gt;Task&lt;/code&gt; organize the interactions with other &lt;code&gt;Operation&lt;/code&gt;s.&lt;br&gt;
This is why the &lt;code&gt;Task&lt;/code&gt; takes a &lt;code&gt;List&amp;lt;Task&amp;gt;&lt;/code&gt; as argument.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;createTask&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;taskName&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Operation&lt;/span&gt; &lt;span class="n"&gt;operation&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;predecessors&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When defining a Task T, what you concretely do is :&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Referencing the Operation to execute&lt;/li&gt;
&lt;li&gt;Referencing the Task(s) whom resulting Flux(es) will be used as argument for the task T's Operation&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  The DataStreamer
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;DataStreamer&lt;/code&gt; produces a &lt;strong&gt;&lt;em&gt;hot stream&lt;/em&gt;&lt;/strong&gt;, a potentially &lt;strong&gt;never ending&lt;/strong&gt; &lt;code&gt;Flux&lt;/code&gt;.&lt;br&gt;
For the sake of demonstration, we used it as an exportable monitoring tool. This means that you can define a &lt;em&gt;REST controller&lt;/em&gt; and a GET method returning a &lt;code&gt;Flux&amp;lt;ServerSentEvent&amp;gt;&lt;/code&gt; that will be consumed by a web app.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ServerSentEvent&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getAllPipelinesStatesFlux&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;

    &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ServerSentEvent&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getSinglePipelineStatesFlux&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Pipeline&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In the case you define multiple Pipelines, the DataStreamer can produces a Flux for all of them or a single one. For a single Pipeline, you'll use the second method.&lt;/p&gt;

&lt;h2&gt;
  
  
  Other useful objects
&lt;/h2&gt;

&lt;p&gt;Other useful objects are not directly accessible through the &lt;code&gt;ReactiveContext&lt;/code&gt; class.&lt;/p&gt;

&lt;h3&gt;
  
  
  Operation
&lt;/h3&gt;

&lt;p&gt;As already said, &lt;code&gt;Operation&lt;/code&gt; is the corner-stone of this API. This interface is used to define you domain's logic. As it is a &lt;code&gt;Functional Interface&lt;/code&gt;, you can use it as a Lambda expression.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;?&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;?&amp;gt;...&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;TaskExecutionException&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can take some frustrating examples to show how to use it :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Operation&lt;/span&gt; &lt;span class="n"&gt;o1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;Operation&lt;/span&gt; &lt;span class="n"&gt;o2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;91&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;Operation&lt;/span&gt; &lt;span class="n"&gt;o3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;  
      &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;?&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;int1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;  
      &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;?&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;int2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;inputs&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;  
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;zip&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;int1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;int2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="n"&gt;y&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;  
    &lt;span class="o"&gt;};&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Operation o1 will produce a &lt;code&gt;Flux&amp;lt;Integer&amp;gt;&lt;/code&gt; : 1, 2, 3... 10&lt;br&gt;
Operation o2 will produce a &lt;code&gt;Flux&amp;lt;Integer&amp;gt;&lt;/code&gt; : 91, 92, 93... 100&lt;br&gt;
Operation o3 will use each single value from preceding &lt;code&gt;Flux&lt;/code&gt;es by creating tuples that will be processed for the producing of a new result (it is the way to use the &lt;code&gt;.zip&lt;/code&gt; operator) : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;(1, 91) will produce 92&lt;/li&gt;
&lt;li&gt;(2, 92) will produce 94&lt;/li&gt;
&lt;li&gt;(3, 93) will produce 96&lt;/li&gt;
&lt;li&gt;etc.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Of course, this is possible only if you have created the necessary Tasks objects around your Operations :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;t1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ReactiveContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createTask&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Integer Flux 1"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;o1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emptyList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;t2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ReactiveContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createTask&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Integer Flux 2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;o2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emptyList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;t3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ReactiveContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createTask&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Sum t1 t2"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;o3&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t2&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There's many things to say here.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the &lt;code&gt;Operation&lt;/code&gt;'s single abstract method, &lt;code&gt;process(Flux... inputs)&lt;/code&gt;, may take 0, 1 or N &lt;code&gt;Flux&lt;/code&gt;(es) as argument. This is why the Lambda expression starts this way : &lt;code&gt;inputs -&amp;gt; ... ;&lt;/code&gt; In the case of a starting &lt;code&gt;Operation&lt;/code&gt;, an &lt;code&gt;Operation&lt;/code&gt; without any &lt;em&gt;predecessors&lt;/em&gt;, there's no inputs to process but we have to respect the method's signature. In the example above, only the &lt;strong&gt;o3&lt;/strong&gt; operation has inputs to process and this is done by getting them from the array of &lt;code&gt;Flux&lt;/code&gt;es produced by the varargs argument.&lt;/li&gt;
&lt;li&gt;the &lt;strong&gt;t1&lt;/strong&gt; and &lt;strong&gt;t2&lt;/strong&gt; &lt;code&gt;Task&lt;/code&gt;s wrap starting &lt;code&gt;Operation&lt;/code&gt;s, that's why there's no previous &lt;code&gt;Task&lt;/code&gt;s to declare here. But we still have to pass an empty collection as argument.&lt;/li&gt;
&lt;li&gt;the &lt;strong&gt;t3&lt;/strong&gt; &lt;code&gt;Task&lt;/code&gt; do have predecessors, respectively the &lt;strong&gt;t1&lt;/strong&gt; and &lt;strong&gt;t2&lt;/strong&gt; &lt;code&gt;Task&lt;/code&gt;s which are respectively wrapping &lt;strong&gt;o1&lt;/strong&gt; and &lt;strong&gt;o2&lt;/strong&gt; &lt;code&gt;Operation&lt;/code&gt;s. In that case, we pass a collection made of the &lt;strong&gt;t1&lt;/strong&gt; and &lt;strong&gt;t2&lt;/strong&gt; &lt;code&gt;Task&lt;/code&gt;s. This collection is a &lt;code&gt;List&lt;/code&gt;, because the order of the argument matters of course. &lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Notifier / StateNotifier
&lt;/h3&gt;

&lt;p&gt;You theoretically will not have to handle the &lt;code&gt;Notifier&lt;/code&gt; (interface) and the &lt;code&gt;StateNotifier&lt;/code&gt; (implementation) in charge of notifying the &lt;code&gt;DataStreamer&lt;/code&gt; of any change in any &lt;code&gt;Monitorable&lt;/code&gt;'s inner state.&lt;br&gt;
We here use a &lt;em&gt;Visitor&lt;/em&gt;'s pattern to delegate the action of notifying to an independent object knowing the &lt;code&gt;DataStreamer&lt;/code&gt; and the &lt;code&gt;Pipeline&lt;/code&gt; it is reporting about.&lt;/p&gt;
&lt;h3&gt;
  
  
  Monitorable / Monitor
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;Monitor&lt;/code&gt; is a class holding the inner state of any &lt;code&gt;Monitorable&lt;/code&gt; object. Like we have seen before, &lt;code&gt;Monitorable&lt;/code&gt; objects are :&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;Pipeline&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;WorkGroup&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;Task&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The &lt;code&gt;Monitorable&lt;/code&gt; class is the &lt;em&gt;mother class&lt;/em&gt; from which the 3 above objects are derived. Its properties are :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Monitor&lt;/span&gt; &lt;span class="n"&gt;monitor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Notifier&lt;/span&gt; &lt;span class="n"&gt;notifier&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;?&amp;gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;inputFluxesMap&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;synchronizedMap&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;LinkedHashMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This last property is a central part of our system. We actually use it exclusively for &lt;code&gt;Task&lt;/code&gt;s management, but it also could be used for &lt;code&gt;WorkGroup&lt;/code&gt;s with an adapted &lt;code&gt;Optimizer&lt;/code&gt;.&lt;br&gt;
You may have noticed that our &lt;code&gt;Monitorable&lt;/code&gt;s are designed like a &lt;em&gt;Composite&lt;/em&gt; object. Thus, not only &lt;code&gt;Task&lt;/code&gt;s may receive &lt;code&gt;Flux&lt;/code&gt;es as arguments from predecessors, but also &lt;code&gt;WorkGroup&lt;/code&gt;s. This is still a work-in-progress that will be solved with a non-basic &lt;code&gt;Optimizer&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Have fun.&lt;/p&gt;

&lt;p&gt;Lovegiver&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Written with &lt;a href="https://stackedit.io/"&gt;StackEdit&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>reactive</category>
      <category>java</category>
      <category>api</category>
      <category>tutorial</category>
    </item>
    <item>
      <title>Reactive Pipeline : a starter (part 1)</title>
      <dc:creator>Lovegiver</dc:creator>
      <pubDate>Wed, 20 Jul 2022 18:58:00 +0000</pubDate>
      <link>https://dev.to/lovegiver/reactive-pipeline-a-starter-part-1-578n</link>
      <guid>https://dev.to/lovegiver/reactive-pipeline-a-starter-part-1-578n</guid>
      <description>&lt;h2&gt;
  
  
  Reactive Pipeline : a starter (part 1)
&lt;/h2&gt;

&lt;p&gt;This article is about an app I've made for learning the basics of &lt;strong&gt;Reactive Programming&lt;/strong&gt; &lt;strong&gt;in Java&lt;/strong&gt;. &lt;br&gt;
It was in reaction to the lack of practical sources that I've decided to make it on my own.&lt;br&gt;
There's a lot of resources already available, and I really thank all the authors for giving me the ABC of this definitely &lt;em&gt;new programming paradigm&lt;/em&gt;, but they often describe the same concepts and basic stuff. It is important of course, but the subject is hard when time comes to make it concrete and working.&lt;/p&gt;

&lt;p&gt;This post is an introduction. I need it to explain my motivations and convince you that the points I will mention are relevant. Explanations about the app itself will come in the next post that I'm actively writing.&lt;/p&gt;

&lt;p&gt;Before you start, you have to know the followings :&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;this technology is quite recent in the Java world. Spring's
engeeniers work hard on it. Things you learn today may change.&lt;/li&gt;
&lt;li&gt;this technology just begins to be used in    enterprises. People
mostly don't know anything about it.
&lt;/li&gt;
&lt;li&gt;this technology has a hard learning curve&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can find the app on my &lt;a href="https://github.com/Lovegiver/ReactivePipeline/tree/master"&gt;Github repo&lt;/a&gt;.&lt;/p&gt;
&lt;h3&gt;
  
  
  Why this app.
&lt;/h3&gt;

&lt;p&gt;&lt;em&gt;ReactivePipeline&lt;/em&gt; is a simple library of my own, built around the &lt;em&gt;TaskPipeline&lt;/em&gt; project. &lt;/p&gt;

&lt;p&gt;Its aim is simply to provide a clear single class, named &lt;code&gt;ReactiveContext&lt;/code&gt;, with a few static methods providing all necessary objects to build a simple yet working reactive application. Under the hood, there's the &lt;strong&gt;Spring's Reactor&lt;/strong&gt; project, a very little part to be honest.&lt;/p&gt;

&lt;p&gt;Why did I do that ? For both the pleasure of learning reactive programming and share that little knowledge with the ones who are interested in it, the ones who would like to learn more but have not enough time, and maybe the ones who, just like I do, remain hungry after reading available tutorials.&lt;/p&gt;

&lt;p&gt;If you have ever read some tutorials about &lt;strong&gt;reactive programming&lt;/strong&gt;, maybe did you tell yourself that creating a &lt;strong&gt;Flux&lt;/strong&gt; was not the most difficult part.&lt;/p&gt;

&lt;p&gt;Take a simple example from &lt;a href="https://www.baeldung.com/reactor-core"&gt;Baeldung's post related to Reactor core&lt;/a&gt; :&lt;/p&gt;

&lt;p&gt;With such code :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Integer&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;elements&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;

&lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;just&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;log&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
  &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;elements:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;add&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

&lt;span class="n"&gt;assertThat&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;elements&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;containsExactly&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;you will produce this result in the console, thanks to the &lt;code&gt;.log()&lt;/code&gt; statement :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.550&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onSubscribe&lt;/span&gt;&lt;span class="o"&gt;([&lt;/span&gt;&lt;span class="nc"&gt;Synchronous&lt;/span&gt; &lt;span class="nc"&gt;Fuseable&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="nc"&gt;FluxArray&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ArraySubscription&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;unbounded&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onNext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onNext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onNext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onNext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;19.553&lt;/span&gt; &lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="o"&gt;]&lt;/span&gt; &lt;span class="no"&gt;INFO&lt;/span&gt;  &lt;span class="n"&gt;reactor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Array&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="n"&gt;onComplete&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Of course, the &lt;code&gt;.subscribe()&lt;/code&gt; statement does the job and will fill the &lt;code&gt;elements&lt;/code&gt; collection with each received value. The assertion confirm that.&lt;/p&gt;

&lt;p&gt;But... don't you find this very frustrating ?&lt;/p&gt;

&lt;p&gt;As you can see, there's no difficulty in creating a Flux dispatching integer values before putting each of them in a collection.&lt;br&gt;
But it is &lt;em&gt;not sufficient&lt;/em&gt;, because for me the main question is : &lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;How to maintain a process as a Flux from the beginning to the end of our processing operations ?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;My main objective in writing this app, consequently, was to find out a simple way to &lt;em&gt;keep all operations reactive, from the very first one to the very last one&lt;/em&gt;. Because writing a reactive app is just like a relay race : when a runner is about to end his tour, the next runner has already started to run so that there's no waiting time, no speed loss at the very moment of passing the baton.&lt;/p&gt;

&lt;p&gt;This is why reactive programming seems &lt;strong&gt;a good alternative to imperative paradigm&lt;/strong&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Imperative programming
&lt;/h3&gt;

&lt;p&gt;Imperative programming is our everyday-way of making our job. We know it so much that we don't question it and accept its constraints for 2 main reasons :&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;it is normal to do as we learnt to, but the way we learnt is also induced by the procedural nature of most of the languages&lt;/li&gt;
&lt;li&gt;it is the way our brain works : cause -&amp;gt; consequence in a single thread and sequentially&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In an imperative programming style, we use to design operations that will be executed one after the other.&lt;/p&gt;

&lt;p&gt;methodA(...) --&amp;gt; methodB(...) --&amp;gt; methodC(...)&lt;/p&gt;

&lt;p&gt;When &lt;code&gt;methodA(...)&lt;/code&gt; is called, it does its stuff and triggers the next method when ending. During this time, &lt;code&gt;methodB(...)&lt;/code&gt; is waiting, and so is &lt;code&gt;methodC(...)&lt;/code&gt;. &lt;/p&gt;

&lt;p&gt;Of course, we all know some workarounds like iterating on a collection and parallelizing calls to &lt;code&gt;methodB(...)&lt;/code&gt; and then &lt;code&gt;methodC(...)&lt;/code&gt;. But we have to deal with &lt;em&gt;threads&lt;/em&gt;, what is funny in fact but hard to debug in fact too.&lt;/p&gt;

&lt;p&gt;Or we can of course exploit all the possibilities of &lt;em&gt;Functional programming&lt;/em&gt;, like parallel-streaming collections, but code will quickly become unreadable with functions into functions into functions...&lt;/p&gt;

&lt;p&gt;Think about a call to a repository that is returning many objects. You won't be able to process any of these results before the database server has returned its result set. It could be a request taking a long time, and nothing happens in the meanwhile.&lt;br&gt;
At the opposite, reactive programming will send you objects from the repository each time one is found. While the database server is still processing your request, you can start processing what has already been found so far.&lt;/p&gt;

&lt;p&gt;A reactive app can run faster not because it is intrinsically faster but just never waits.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Never waiting is what we want to design and build. We want pipes with&lt;br&gt;
liquid data flowing inside. &lt;strong&gt;&lt;em&gt;We want a Flux&lt;/em&gt;&lt;/strong&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  ReactivePipeline : disclaimer before we start using it
&lt;/h3&gt;

&lt;p&gt;This is what the &lt;em&gt;ReactivePipeline&lt;/em&gt; is about : create a persistent Flux across the whole application.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Does-it mean that there's no reactive code to write ?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;No. I will talk to you about the &lt;code&gt;Operation&lt;/code&gt; class that is a pure reactive object.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Does-it mean that you won't have to wire all operations in a&lt;br&gt;
continuous flux by yourself ?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Yes. I will talk to you about the &lt;code&gt;Task&lt;/code&gt; and &lt;code&gt;Pipeline&lt;/code&gt; classes that are dedicated to &lt;em&gt;wire&lt;/em&gt; operations together so easily.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Is it the perfect app for making things in a reactive way ?&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Well, I guess that once you will understand how this modest project works, you will have all the necessary knowledge to use pure Spring Reactor classes. Just take it as a work-in-progress library, with known defaults, that can be improved in so many ways (I will transparently talk about too).&lt;/p&gt;

&lt;p&gt;See you next time for practical things : &lt;a href="https://dev.to/lovegiver/-reactive-pipeline-the-app-part-2-32ef"&gt;Reactive Pipeline : the App&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Lovegiver&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Written with &lt;a href="https://stackedit.io/"&gt;StackEdit&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>reactive</category>
      <category>java</category>
      <category>api</category>
      <category>tutorial</category>
    </item>
  </channel>
</rss>
