<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Dominic Fox</title>
    <description>The latest articles on DEV Community by Dominic Fox (@poetix).</description>
    <link>https://dev.to/poetix</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3236%2F424824.jpeg</url>
      <title>DEV Community: Dominic Fox</title>
      <link>https://dev.to/poetix</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/poetix"/>
    <language>en</language>
    <item>
      <title>Event Replaying with Hazelcast Jet</title>
      <dc:creator>Dominic Fox</dc:creator>
      <pubDate>Thu, 16 Feb 2017 15:42:19 +0000</pubDate>
      <link>https://dev.to/poetix/event-replaying-with-hazelcast-jet</link>
      <guid>https://dev.to/poetix/event-replaying-with-hazelcast-jet</guid>
      <description>&lt;p&gt;(the following article is &lt;a href="https://opencredo.com/event-replaying-hazelcast-jet/" rel="canonical"&gt;cross-posted&lt;/a&gt; from the &lt;a href="https://opencredo.com/blog"&gt;OpenCredo Blog&lt;/a&gt;, where Dominic and other &lt;a href="https://opencredo.com"&gt;OpenCredo&lt;/a&gt; authors write about technologies and technology challenges that interest and excite them)&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction: Parallel Stream Processing
&lt;/h2&gt;

&lt;p&gt;One of the stated intentions behind the design of &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html"&gt;Java 8's Streams API&lt;/a&gt; was to take better advantage of the multi-core processing power of modern computers. Operations that could be performed on a single, linear stream of values could also be run in parallel by splitting that stream into multiple sub-streams, and combining the results from processing each sub-stream as they became available.&lt;/p&gt;

&lt;p&gt;For example, suppose we wanted to compute the hash of a large array of &lt;code&gt;String&lt;/code&gt; values. The simplest method is to use &lt;code&gt;Arrays.deepHashCode&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;hash&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;deepHashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;but we could also gather the hashes of all of the &lt;code&gt;String&lt;/code&gt;s in the array into an array of &lt;code&gt;int&lt;/code&gt;s and then take the hash of that:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;hash&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;String:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Having done this, we can then parallelise the hashing operation over all the threads in the default Java &lt;code&gt;ForkJoinPool&lt;/code&gt;, by making the stream parallel:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;hash&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;parallel&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
    &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;String:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here's a test which uses the &lt;a href="http://databene.org/contiperf"&gt;Contiperf&lt;/a&gt; performance testing library to gather some metrics on these three methods:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;PerformanceTest&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Rule&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;ContiPerfRule&lt;/span&gt; &lt;span class="n"&gt;contiperfRule&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ContiPerfRule&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;strings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;IntStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1000000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToObj&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;randomString&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]::&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;randomString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Random&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ThreadLocalRandom&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;current&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;bytes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;
        &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextBytes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bytes&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;String&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bytes&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="nd"&gt;@PerfTest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invocations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;warmUp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;hashWithDeepHashCode&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;deepHashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="nd"&gt;@PerfTest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invocations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;warmUp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;hashWithLinearStream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;mapToInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;String:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="nd"&gt;@PerfTest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invocations&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;warmUp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;10000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;hashWithParallelStream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;parallel&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;mapToInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;String:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and here are the results for the three approaches:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;com.opencredo.eventhose.PerformanceTest.hashWithParallelStream
samples: 8536
max:     19
average: 6.018158388003749
median:  6

com.opencredo.eventhose.PerformanceTest.hashWithDeepHashCode
samples: 9234
max:     29
average: 10.06670998483864
median:  11

com.opencredo.eventhose.PerformanceTest.hashWithLinearStream
samples: 9262
max:     40
average: 12.909090909090908
median:  13
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Much as we would expect, linearly converting an array of &lt;code&gt;String&lt;/code&gt;s to an array of &lt;code&gt;int&lt;/code&gt;s and then hashing it is around 20% slower than just using &lt;code&gt;Arrays.deepHashCode&lt;/code&gt;. But when we parallelise the hashing of each individual &lt;code&gt;String&lt;/code&gt;, we see a significant speed-up as more CPU cores are brought into play, each processing a subset of the values in our original array.&lt;/p&gt;

&lt;p&gt;We're still limited, however, to the CPU and memory resources available to us on a single machine. What if we could parallelise stream processing tasks across a &lt;em&gt;cluster&lt;/em&gt; of machines?&lt;/p&gt;

&lt;h2&gt;
  
  
  Hazelcast Jet
&lt;/h2&gt;

&lt;p&gt;Hazelcast Jet builds two layers on top of the Hazelcast distributed data grid. The first maps a DAG (Directed Acyclic Graph) representing computational flow over the grid, providing a general-purpose processing model for distributed dataflow. Data is provided by sources, consumed by sinks, and transformed by processors which act as both sinks and sources. This model should be familiar to anyone who's worked with Spark, Spring XD or Apache Storm, and I'm not going to discuss it in detail here - there's &lt;a href="http://jet.hazelcast.org/performance/#DAGtoModelComputations"&gt;a good introduction on the Hazelcast Jet website&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The second layer maps a computation defined using the Java 8 Streams API into a DAG. With some small tweaks, we can run the same code as before, and have it distributed over a cluster of Hazelcast instances:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;JetTest&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;strings&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;IntStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;range&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1000000&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToObj&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;randomString&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;[]::&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;randomString&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;Random&lt;/span&gt; &lt;span class="n"&gt;random&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;ThreadLocalRandom&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;current&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;bytes&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[&lt;/span&gt;&lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="o"&gt;];&lt;/span&gt;
        &lt;span class="n"&gt;random&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nextBytes&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bytes&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;String&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bytes&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JetInstance&lt;/span&gt; &lt;span class="n"&gt;jet1&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JetInstance&lt;/span&gt; &lt;span class="n"&gt;jet2&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JetInstance&lt;/span&gt; &lt;span class="n"&gt;jet3&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;JetInstance&lt;/span&gt; &lt;span class="n"&gt;jet4&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="nd"&gt;@Before&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;startJet&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;jet1&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Jet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newJetInstance&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet2&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Jet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newJetInstance&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet3&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Jet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newJetInstance&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet4&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Jet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newJetInstance&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@After&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;stopJet&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;jet1&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdown&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet2&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdown&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet3&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdown&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;jet4&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;shutdown&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="nd"&gt;@Test&lt;/span&gt;
    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;hashInJet&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;IStreamList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;jet1&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"strings"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strings&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;forEach&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;input:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;add&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

        &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;hash&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Arrays&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapToInt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;String:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;hashCode&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toArray&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

        &lt;span class="nc"&gt;System&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;out&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;println&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;hash&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Perhaps unsurprisingly, given the work needed to co-ordinate the work within the cluster and push serialised data and bytecode out to each node that will perform it, this is a lot slower than doing the same thing in-memory within a single process. The ability to re-use a well-known programming model to define distributed computations, however, is very appealing. Let's give it something more interesting to do than hash strings.&lt;/p&gt;

&lt;h2&gt;
  
  
  Replaying Events
&lt;/h2&gt;

&lt;p&gt;Suppose we have a stream of events from many sources, interleaved and possibly out of order, and we want to resolve this into a map of time-ordered event histories, grouped by event source. In other words, given:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;At 3pm, Lightbulb 1 was switched on.
At 1pm, Lightbulb 2 was switched off.
At 2pm, Lightbulb 1 was plugged in.
At 11am, Lightbulb 2 was plugged in.
At 10am, Lightbulb 3 was plugged in.
At 12pm, Lightbulb 2 was switched on...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;we would like to have&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Lightbulb 1:
    2pm: plugged in
    3pm: switched on

Lightbulb 2:
    11am: plugged in
    12pm: switched on
    1pm: switched off

Lightbulb 3:
    10am: plugged in...
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This can be expressed using the Streams API as follows:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;Map&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;AggregateId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;SortedSet&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Event&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;eventHistories&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;groupingBy&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nl"&gt;Event:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;getAggregateId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;toCollection&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;TreeSet&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Event&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;eventTimestampComparator&lt;/span&gt;&lt;span class="o"&gt;)));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Given a function that takes the event history of a single lightbulb, and returns the lightbulb's current state, we can transform our disorderly event stream into something even more useful: a key/value look-up giving the current state of every lightbulb:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;IMap&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;AggregateId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;LightbulbState&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;eventHistories&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;groupingByToIMap&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nl"&gt;Event:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;getAggregateId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;collectingAndThen&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;toCollection&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;TreeSet&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Event&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;eventTimestampComparator&lt;/span&gt;&lt;span class="o"&gt;)),&lt;/span&gt;
                &lt;span class="n"&gt;eventHistory&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;stateModel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCurrentState&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventHistory&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note the change of signature from &lt;code&gt;Map&lt;/code&gt; to &lt;code&gt;IMap&lt;/code&gt; (a Hazelcast distributed key/value map), which means that the results will be stored in a map distributed over the entire Hazelcast grid. This means that instead of having to serialize all of the results back to the client and assemble them into a local &lt;code&gt;Map&lt;/code&gt;, Hazelcast can keep the results of each computation in the partition of the data grid where the computation was performed. This has the potential to be very useful in an event-sourcing system where we want to have a fast cached lookup for the current states of aggregates, especially if we want the cache to be distributed over a grid of servers.&lt;/p&gt;

&lt;p&gt;Because the Streams API defines computations as terminating with collection operations - similar to the "reduce" stage in map/reduce processing - it seems that using Hazelcast Jet with this API is best suited for batch computation over a streaming data source, rather than continuous stream processing operations such as computing a rolling average or detecting anomalous patterns of events within a rolling time window. However, on first impressions Hazelcast Jet provides a familiar and convenient programming API to a powerful distributed computation engine, which has the potential to replace Spark or Storm for some use cases.&lt;/p&gt;

</description>
      <category>java8</category>
      <category>streams</category>
      <category>hazelcast</category>
      <category>hazelcastjet</category>
    </item>
  </channel>
</rss>
