loading...

Event Replaying with Hazelcast Jet

poetix profile image Dominic Fox ・5 min read

(the following article is cross-posted from the OpenCredo Blog, where Dominic and other OpenCredo authors write about technologies and technology challenges that interest and excite them)

Introduction: Parallel Stream Processing

One of the stated intentions behind the design of Java 8's Streams API was to take better advantage of the multi-core processing power of modern computers. Operations that could be performed on a single, linear stream of values could also be run in parallel by splitting that stream into multiple sub-streams, and combining the results from processing each sub-stream as they became available.

For example, suppose we wanted to compute the hash of a large array of String values. The simplest method is to use Arrays.deepHashCode:

int hash = Arrays.deepHashCode(strings);

but we could also gather the hashes of all of the Strings in the array into an array of ints and then take the hash of that:

int hash = Arrays.hashCode(Arrays.stream(strings)
    .mapToInt(String::hashCode).toArray());

Having done this, we can then parallelise the hashing operation over all the threads in the default Java ForkJoinPool, by making the stream parallel:

int hash = Arrays.hashCode(Arrays.stream(strings).parallel()
    .mapToInt(String::hashCode).toArray());

Here's a test which uses the Contiperf performance testing library to gather some metrics on these three methods:

public class PerformanceTest {

    @Rule
    public final ContiPerfRule contiperfRule = new ContiPerfRule();

    private static final String[] strings = IntStream.range(0, 1000000)
            .mapToObj(i -> randomString())
            .toArray(String[]::new);

    private static String randomString() {
        Random random = ThreadLocalRandom.current();
        byte[] bytes = new byte[random.nextInt(100) + 1];
        random.nextBytes(bytes);
        return new String(bytes);
    }

    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithDeepHashCode() {
        Arrays.deepHashCode(strings);
    }

    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithLinearStream() {
        Arrays.hashCode(Arrays.stream(strings).mapToInt(String::hashCode).toArray());
    }

    @Test
    @PerfTest(invocations = 10000, warmUp = 10000)
    public void hashWithParallelStream() {
        Arrays.hashCode(Arrays.stream(strings).parallel().mapToInt(String::hashCode).toArray());
    }
}

and here are the results for the three approaches:

com.opencredo.eventhose.PerformanceTest.hashWithParallelStream
samples: 8536
max:     19
average: 6.018158388003749
median:  6

com.opencredo.eventhose.PerformanceTest.hashWithDeepHashCode
samples: 9234
max:     29
average: 10.06670998483864
median:  11

com.opencredo.eventhose.PerformanceTest.hashWithLinearStream
samples: 9262
max:     40
average: 12.909090909090908
median:  13

Much as we would expect, linearly converting an array of Strings to an array of ints and then hashing it is around 20% slower than just using Arrays.deepHashCode. But when we parallelise the hashing of each individual String, we see a significant speed-up as more CPU cores are brought into play, each processing a subset of the values in our original array.

We're still limited, however, to the CPU and memory resources available to us on a single machine. What if we could parallelise stream processing tasks across a cluster of machines?

Hazelcast Jet

Hazelcast Jet builds two layers on top of the Hazelcast distributed data grid. The first maps a DAG (Directed Acyclic Graph) representing computational flow over the grid, providing a general-purpose processing model for distributed dataflow. Data is provided by sources, consumed by sinks, and transformed by processors which act as both sinks and sources. This model should be familiar to anyone who's worked with Spark, Spring XD or Apache Storm, and I'm not going to discuss it in detail here - there's a good introduction on the Hazelcast Jet website.

The second layer maps a computation defined using the Java 8 Streams API into a DAG. With some small tweaks, we can run the same code as before, and have it distributed over a cluster of Hazelcast instances:

public class JetTest {

    private static final String[] strings = IntStream.range(0, 1000000)
            .mapToObj(i -> randomString())
            .toArray(String[]::new);

    private static String randomString() {
        Random random = ThreadLocalRandom.current();
        byte[] bytes = new byte[random.nextInt(100) + 1];
        random.nextBytes(bytes);
        return new String(bytes);
    }

    private JetInstance jet1;
    private JetInstance jet2;
    private JetInstance jet3;
    private JetInstance jet4;

    @Before
    public void startJet() {
        jet1 = Jet.newJetInstance();
        jet2 = Jet.newJetInstance();
        jet3 = Jet.newJetInstance();
        jet4 = Jet.newJetInstance();
    }

    @After
    public void stopJet() {
        jet1.shutdown();
        jet2.shutdown();
        jet3.shutdown();
        jet4.shutdown();
    }

    @Test
    public void hashInJet() {
        IStreamList<String> input = jet1.getList("strings");
        Arrays.stream(strings).forEach(input::add);

        int hash = Arrays.hashCode(input.stream()
                .mapToInt(String::hashCode)
                .toArray());

        System.out.println(hash);
    }
}

Perhaps unsurprisingly, given the work needed to co-ordinate the work within the cluster and push serialised data and bytecode out to each node that will perform it, this is a lot slower than doing the same thing in-memory within a single process. The ability to re-use a well-known programming model to define distributed computations, however, is very appealing. Let's give it something more interesting to do than hash strings.

Replaying Events

Suppose we have a stream of events from many sources, interleaved and possibly out of order, and we want to resolve this into a map of time-ordered event histories, grouped by event source. In other words, given:

At 3pm, Lightbulb 1 was switched on.
At 1pm, Lightbulb 2 was switched off.
At 2pm, Lightbulb 1 was plugged in.
At 11am, Lightbulb 2 was plugged in.
At 10am, Lightbulb 3 was plugged in.
At 12pm, Lightbulb 2 was switched on...

we would like to have

Lightbulb 1:
    2pm: plugged in
    3pm: switched on

Lightbulb 2:
    11am: plugged in
    12pm: switched on
    1pm: switched off

Lightbulb 3:
    10am: plugged in...

This can be expressed using the Streams API as follows:

Map<AggregateId, SortedSet<Event>> eventHistories = events.stream()
        .collect(groupingBy(
            Event::getAggregateId,
            toCollection(() -> TreeSet<Event>(eventTimestampComparator)));

Given a function that takes the event history of a single lightbulb, and returns the lightbulb's current state, we can transform our disorderly event stream into something even more useful: a key/value look-up giving the current state of every lightbulb:

IMap<AggregateId, LightbulbState> eventHistories = events.stream()
        .collect(groupingByToIMap(
            Event::getAggregateId,
            collectingAndThen(
                toCollection(() -> TreeSet<Event>(eventTimestampComparator)),
                eventHistory -> stateModel.getCurrentState(eventHistory));

Note the change of signature from Map to IMap (a Hazelcast distributed key/value map), which means that the results will be stored in a map distributed over the entire Hazelcast grid. This means that instead of having to serialize all of the results back to the client and assemble them into a local Map, Hazelcast can keep the results of each computation in the partition of the data grid where the computation was performed. This has the potential to be very useful in an event-sourcing system where we want to have a fast cached lookup for the current states of aggregates, especially if we want the cache to be distributed over a grid of servers.

Because the Streams API defines computations as terminating with collection operations - similar to the "reduce" stage in map/reduce processing - it seems that using Hazelcast Jet with this API is best suited for batch computation over a streaming data source, rather than continuous stream processing operations such as computing a rolling average or detecting anomalous patterns of events within a rolling time window. However, on first impressions Hazelcast Jet provides a familiar and convenient programming API to a powerful distributed computation engine, which has the potential to replace Spark or Storm for some use cases.

Discussion

pic
Editor guide