<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Tomasz Nurkiewicz</title>
    <description>The latest articles on DEV Community by Tomasz Nurkiewicz (@tnurkiewicz).</description>
    <link>https://dev.to/tnurkiewicz</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3049%2FjHB6g8Nk.jpg</url>
      <title>DEV Community: Tomasz Nurkiewicz</title>
      <link>https://dev.to/tnurkiewicz</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/tnurkiewicz"/>
    <language>en</language>
    <item>
      <title>Small scale stream processing kata: thread pools</title>
      <dc:creator>Tomasz Nurkiewicz</dc:creator>
      <pubDate>Sat, 11 Feb 2017 23:46:33 +0000</pubDate>
      <link>https://dev.to/tnurkiewicz/small-scale-stream-processing-kata-thread-pools</link>
      <guid>https://dev.to/tnurkiewicz/small-scale-stream-processing-kata-thread-pools</guid>
      <description>&lt;p&gt;&lt;a href="http://www.nurkiewicz.com/2015/10/geecon-programming-contest-answers.html"&gt;Once again&lt;/a&gt; I prepared a programming contest on &lt;a href="http://2016.geecon.org/"&gt;GeeCON 2016&lt;/a&gt; for &lt;a href="http://www.4financeit.com"&gt;my company&lt;/a&gt;. This time the assignment required designing and optionally implementing a system given the following requirements:&lt;/p&gt;




&lt;p&gt;A system delivers around one thousand events per second. Each &lt;code&gt;Event&lt;/code&gt; has at least two attributes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;clientId&lt;/code&gt; - we expect up to few events per second for one client&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;UUID&lt;/code&gt; - globally unique&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Consuming one event takes about 10 milliseconds. Design a consumer of such stream that:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;allows processing events in real time&lt;/li&gt;
&lt;li&gt;events related to one client should be processed sequentially and in order, i.e. you can not parallelize events for the same &lt;code&gt;clientId&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;if duplicated &lt;code&gt;UUID&lt;/code&gt; appeared within 10 seconds, drop it. Assume duplicates will not appear after 10 seconds&lt;/li&gt;
&lt;/ol&gt;




&lt;p&gt;There are few important details in these requirements:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;1000 events/s and 10 ms to consume one event. Clearly we need at least 10 concurrent consumers in order to consume in near real-time.&lt;/li&gt;
&lt;li&gt;Events have natural aggregate ID (&lt;code&gt;clientId&lt;/code&gt;). During one second we can expect a few events for a given client and we are not allowed to process them concurrently or out of order.&lt;/li&gt;
&lt;li&gt;We must somehow ignore duplicated messages, most likely by remembering all unique IDs in last 10 seconds. This gives about 10 thousand &lt;code&gt;UUID&lt;/code&gt;s to keep temporarily.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In this article I'd like to guide you through couple of correct solutions and few broken attempts. You will also learn how to troubleshoot issues with few precisely targeted metrics.&lt;/p&gt;

&lt;h1&gt;
  
  
  Naive sequential processing
&lt;/h1&gt;

&lt;p&gt;Let's tackle this problem in iterations. First we must make some assumption on the API. Imagine it looks like that:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;interface EventStream {

    void consume(EventConsumer consumer);

}

@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);
}

@Value
class Event {

    private final Instant created = Instant.now();
    private final int clientId;
    private final UUID uuid;

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A typical push-based API, similar to JMS. An important note is that &lt;code&gt;EventConsumer&lt;/code&gt; is blocking, meaning it won't deliver new &lt;code&gt;Event&lt;/code&gt; until the previous one was consumed by &lt;code&gt;EventConsumer&lt;/code&gt;. This is just an assumption I made that does not drastically change the requirements. This is also how message listeners work in JMS. The naive implementation simply attaches a listener that takes around 10 milliseconds to complete:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class ClientProjection implements EventConsumer {

    @Override
    public Event consume(Event event) {
        Sleeper.randSleep(10, 1);
        return event;
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Of course in real life this consumer would store something in a database, make remote call, etc. I add a bit of randomness to sleep time distribution to make manual testing more realistic:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class Sleeper {

    private static final Random RANDOM = new Random();

    static void randSleep(double mean, double stdDev) {
        final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);
        try {
            TimeUnit.MICROSECONDS.sleep((long) micros);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

//...

EventStream es = new EventStream();  //some real implementation here
es.consume(new ClientProjection());
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;It compiles and runs but in order to figure out that the requirements aren't met we must plug in few metrics. The most important metric is the latency of message consumption, measured as a time between message creation and start of processing. We'll use &lt;a href="http://metrics.dropwizard.io/3.1.0/"&gt;Dropwizard Metrics&lt;/a&gt; for that:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class ClientProjection implements EventConsumer {

    private final ProjectionMetrics metrics;

    ClientProjection(ProjectionMetrics metrics) {
        this.metrics = metrics;
    }

    @Override
    public Event consume(Event event) {
        metrics.latency(Duration.between(event.getCreated(), Instant.now()));
        Sleeper.randSleep(10, 1);
        return event;
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The &lt;code&gt;ProjectionMetrics&lt;/code&gt; class was extracted to separate responsibilities:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@Slf4j
class ProjectionMetrics {

    private final Histogram latencyHist;

    ProjectionMetrics(MetricRegistry metricRegistry) {
        final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry)
                .outputTo(log)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .build();
        reporter.start(1, TimeUnit.SECONDS);
        latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));
    }

    void latency(Duration duration) {
        latencyHist.update(duration.toMillis());
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now when you run the naive solution you'll quickly discover that median latency as well as 99.9th percentile keep growing infinitely:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;type=HISTOGRAM, [...] count=84,   min=0,  max=795,   mean=404.88540608274104, [...]
    median=414.0,   p75=602.0,   p95=753.0,   p98=783.0,   p99=795.0,   p999=795.0
type=HISTOGRAM, [...] count=182,  min=0,  max=1688,  mean=861.1706371990878,  [...]
    median=869.0,   p75=1285.0,  p95=1614.0,  p98=1659.0,  p99=1678.0,  p999=1688.0

[...30 seconds later...]

type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]
    median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After 30 seconds our application processes events on average with 15 second delay. Not entirely &lt;em&gt;real-time&lt;/em&gt;. Obviously the lack of concurrency whatsoever is the reason. Our &lt;code&gt;ClientProjection&lt;/code&gt; event consumer takes around 10 ms to complete so it can handle up to 100 events per second, whereas we need an order of magnitude more. We must scale &lt;code&gt;ClientProjection&lt;/code&gt; somehow. And we haven't even touched other requirements!&lt;/p&gt;

&lt;h1&gt;
  
  
  Naive thread pool
&lt;/h1&gt;

&lt;p&gt;The most obvious solution is to invoke &lt;code&gt;EventConsumer&lt;/code&gt; from multiple threads. The easiest way to do this is by taking advantage of &lt;code&gt;ExecutorService&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream) {
        this.executorService = Executors.newFixedThreadPool(size);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -&amp;gt; downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We use a &lt;a href="https://en.wikipedia.org/wiki/Decorator_pattern"&gt;&lt;em&gt;decorator&lt;/em&gt; pattern&lt;/a&gt; here. The original &lt;code&gt;ClientProjection&lt;/code&gt;, implementing &lt;code&gt;EventConsumer&lt;/code&gt; was correct. However we wrap it with another implementation of &lt;code&gt;EventConsumer&lt;/code&gt; that adds concurrency. This will allows us to compose complex behaviors without changing &lt;code&gt;ClientProjection&lt;/code&gt; itself. Such design promotes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;loose coupling: various &lt;code&gt;EventConsumer&lt;/code&gt; don't know about each other and can be combined freely&lt;/li&gt;
&lt;li&gt;single responsibility: each does one job and delegates to the next component&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://en.wikipedia.org/wiki/Open/closed_principle"&gt;open/closed principle&lt;/a&gt;: we can change the behavior of the system without modifying existing implementations.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Open/closed principle is typically achieved by injecting strategies and template method pattern. Here it's even simpler. Whole wiring looks as follows:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;MetricRegistry metricRegistry =
        new MetricRegistry();
ProjectionMetrics metrics =
        new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection =
        new ClientProjection(metrics);
NaivePool naivePool =
        new NaivePool(10, clientProjection);
EventStream es = new EventStream();
es.consume(naivePool);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Our carefully crafted metrics reveal that the situation is indeed much better:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]
    median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0
type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]
    median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0

[...30 seconds later...]

type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]
    median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Yet we still see growing delay on a much smaller scale, after 30 seconds the latency reached 364 milliseconds. It keeps growing so the problem is systematic. We... need... more... metrics. Notice that &lt;code&gt;NaivePool&lt;/code&gt; (you'll see soon why it's &lt;em&gt;naive&lt;/em&gt;) has exactly 10 threads at its disposal. This should be just about enough to handle thousand events, each taking 10 ms to process. In reality we need a little bit of extra processing power to avoid issues after garbage collection or during small load spikes. To prove that thread pool is actually our bottleneck it's best to monitor its internal queue. This requires a little bit of work:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class NaivePool implements EventConsumer, Closeable {

    private final EventConsumer downstream;
    private final ExecutorService executorService;

    NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        LinkedBlockingQueue&amp;lt;Runnable&amp;gt; queue = new LinkedBlockingQueue&amp;lt;&amp;gt;();
        String name = MetricRegistry.name(ProjectionMetrics.class, "queue");
        Gauge&amp;lt;Integer&amp;gt; gauge = queue::size;
        metricRegistry.register(name, gauge);
        this.executorService = 
                new ThreadPoolExecutor(
                        size, size, 0L, TimeUnit.MILLISECONDS, queue);
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        executorService.submit(() -&amp;gt; downstream.consume(event));
        return event;
    }

    @Override
    public void close() throws IOException {
        executorService.shutdown();
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The idea here is to create &lt;code&gt;ThreadPoolExecutor&lt;/code&gt; manually in order to provide custom &lt;code&gt;LinkedBlockingQueue&lt;/code&gt; instance. We can later use that queue to monitor its length (see: &lt;a href="http://www.nurkiewicz.com/2014/11/executorservice-10-tips-and-tricks.html"&gt;ExecutorService - 10 tips and tricks&lt;/a&gt;).&lt;br&gt;
&lt;code&gt;Gauge&lt;/code&gt; will periodically invoke &lt;code&gt;queue::size&lt;/code&gt; and report it to wherever you need it. Metrics confirm that thread pool size was indeed a problem:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;type=GAUGE, name=[...].queue, value=35
type=GAUGE, name=[...].queue, value=52

[...30 seconds later...]

type=GAUGE, name=[...].queue, value=601
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The ever-growing size of the queue holding pending tasks hurts the latency. Increasing thread pool size from 10 to 20 finally reports decent results and no stalls. However we still didn't address duplicates and protecting from concurrent modification of events for the same &lt;code&gt;clientId&lt;/code&gt;.&lt;/p&gt;

&lt;h1&gt;
  
  
  Obscure locking
&lt;/h1&gt;

&lt;p&gt;Let's start from avoiding concurrent processing of events for the same &lt;code&gt;clientId&lt;/code&gt;. If two events come very quickly one after another, both related to the same &lt;code&gt;clientId&lt;/code&gt;, &lt;code&gt;NaivePool&lt;/code&gt; will pick both of them and start processing them concurrently. First we'll at least discover such situation by having a &lt;code&gt;Lock&lt;/code&gt; for each &lt;code&gt;clientId&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Slf4j
class FailOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap&amp;lt;Integer, Lock&amp;gt; clientLocks = new ConcurrentHashMap&amp;lt;&amp;gt;();
    private final EventConsumer downstream;

    FailOnConcurrentModification(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        Lock lock = findClientLock(event);
        if (lock.tryLock()) {
            try {
                downstream.consume(event);
            } finally {
                lock.unlock();
            }
        } else {
            log.error("Client {} already being modified by another thread", event.getClientId());
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -&amp;gt; new ReentrantLock());
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This is definitely going in the wrong direction. The amount of complexity is overwhelming but running this code at least reveals there is an issue. The event processing pipeline looks as follows, with one decorator wrapping another:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification =
        new FailOnConcurrentModification(clientProjection);
NaivePool naivePool =
        new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es = new EventStream();

es.consume(naivePool);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Once in a while the error message will pop-up, telling us that some other thread is already processing event for the same &lt;code&gt;clientId&lt;/code&gt;. For each &lt;code&gt;clientId&lt;/code&gt; we associate a &lt;code&gt;Lock&lt;/code&gt; that we examine in order to figure out if another thread is not processing that client at the moment. As ugly as it gets we are actually quite close to a brutal solution. Rather than failing when &lt;code&gt;Lock&lt;/code&gt; cannot be obtained because another thread is already processing some event, let's wait a little bit, hoping the &lt;code&gt;Lock&lt;/code&gt; will get released:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Slf4j
class WaitOnConcurrentModification implements EventConsumer {

    private final ConcurrentMap&amp;lt;Integer, Lock&amp;gt; clientLocks = new ConcurrentHashMap&amp;lt;&amp;gt;();
    private final EventConsumer downstream;
    private final Timer lockWait;

    WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));
    }

    @Override
    public Event consume(Event event) {
        try {
            final Lock lock = findClientLock(event);
            final Timer.Context time = lockWait.time();
            try {
                final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);
                time.stop();
                if(locked) {
                    downstream.consume(event);
                }
            } finally {
                lock.unlock();
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted", e);
        }
        return event;
    }

    private Lock findClientLock(Event event) {
        return clientLocks.computeIfAbsent(
                event.getClientId(),
                clientId -&amp;gt; new ReentrantLock());
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The idea is very similar. But instead of failing &lt;code&gt;tryLock()&lt;/code&gt; waits up to 1 second hoping the &lt;code&gt;Lock&lt;/code&gt; for given client will be released. If two events come in very quick succession, one will obtain a &lt;code&gt;Lock&lt;/code&gt; and proceed whereas the other will block waiting for &lt;code&gt;unlock()&lt;/code&gt; to happen.&lt;/p&gt;

&lt;p&gt;Not only this code is really convoluted, but probably also broken in many subtle ways. For example what if two events for the same &lt;code&gt;clientId&lt;/code&gt; came almost exactly at the same time, but obviously one was first?&lt;br&gt;
Both events will ask for &lt;code&gt;Lock&lt;/code&gt; at the same time and we have no guarantee which event will obtain a non-fair &lt;code&gt;Lock&lt;/code&gt; first, possibly consuming events out of order. There must be a better way...&lt;/p&gt;

&lt;h1&gt;
  
  
  Dedicated threads
&lt;/h1&gt;

&lt;p&gt;Let's take a step back and a very deep breath. How do you ensure things aren't happening concurrently?&lt;br&gt;
Well, just use one thread!&lt;br&gt;
As a matter of fact that's what we did in the very beginning but the throughput was unsatisfactory. But we don't care about concurrency for different &lt;code&gt;clientId&lt;/code&gt;s, we just have to make sure events with the same &lt;code&gt;clientId&lt;/code&gt; are always processed by the same thread!&lt;/p&gt;

&lt;p&gt;Maybe creating a map from &lt;code&gt;clientId&lt;/code&gt; to &lt;code&gt;Thread&lt;/code&gt; comes to your mind?&lt;br&gt;
Well, this would be overly simplistic. We would create thousands of threads, each idle most of the time as per the requirements (only few events per second for given &lt;code&gt;clientId&lt;/code&gt;). A good compromise is a fixed-size pool of threads, each thread responsible for a well-known subset of &lt;code&gt;clientId&lt;/code&gt;s. This way two different &lt;code&gt;clientId&lt;/code&gt;s may end up on the same thread but the same &lt;code&gt;clientId&lt;/code&gt; will always be handled by the same thread. If two events for the same &lt;code&gt;clientId&lt;/code&gt; appear, they will both be routed to the same thread, thus avoiding concurrent processing. The implementation is embarrassingly simple:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class SmartPool implements EventConsumer, Closeable {

    private final List&amp;lt;ExecutorService&amp;gt; threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        List&amp;lt;ExecutorService&amp;gt; list = IntStream
                .range(0, size)
                .mapToObj(i -&amp;gt; Executors.newSingleThreadExecutor())
                .collect(Collectors.toList());
        this.threadPools = new CopyOnWriteArrayList&amp;lt;&amp;gt;(list);
    }

    @Override
    public void close() throws IOException {
        threadPools.forEach(ExecutorService::shutdown);
    }

    @Override
    public Event consume(Event event) {
        final int threadIdx = event.getClientId() % threadPools.size();
        final ExecutorService executor = threadPools.get(threadIdx);
        executor.submit(() -&amp;gt; downstream.consume(event));
        return event;
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The crucial part is right at the end:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;int threadIdx = event.getClientId() % threadPools.size();
ExecutorService executor = threadPools.get(threadIdx);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This simple algorithm will always use the same single-thread &lt;code&gt;ExecutorService&lt;/code&gt; for the same &lt;code&gt;clientId&lt;/code&gt;. Different IDs may end up in the same pool, for example when pool size is &lt;code&gt;20&lt;/code&gt;, clients &lt;code&gt;7&lt;/code&gt;, &lt;code&gt;27&lt;/code&gt;, &lt;code&gt;47&lt;/code&gt;, etc. will use the same thread. But this is OK, as long as one &lt;code&gt;clientId&lt;/code&gt; always uses the same thread. At this point no locking is necessary and sequential invocation is guaranteed because events for the same client are always executed by the same thread. Side note: one thread per &lt;code&gt;clientId&lt;/code&gt; would not scale, but one actor per &lt;code&gt;clientId&lt;/code&gt; (e.g. in Akka) is a great idea that simplifies a lot.&lt;/p&gt;

&lt;p&gt;By the way to be extra safe I plugged in metrics for average queue size in each and every thread pool which made the implementation longer:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class SmartPool implements EventConsumer, Closeable {

    private final List&amp;lt;LinkedBlockingQueue&amp;lt;Runnable&amp;gt;&amp;gt; queues;
    private final List&amp;lt;ExecutorService&amp;gt; threadPools;
    private final EventConsumer downstream;

    SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        this.queues = IntStream
                .range(0, size)
                .mapToObj(i -&amp;gt; new LinkedBlockingQueue&amp;lt;Runnable&amp;gt;())
                .collect(Collectors.toList());
        List&amp;lt;ThreadPoolExecutor&amp;gt; list = queues
                .stream()
                .map(q -&amp;gt; new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q))
                .collect(Collectors.toList());
        this.threadPools = new CopyOnWriteArrayList&amp;lt;&amp;gt;(list);
        metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge&amp;lt;Double&amp;gt;) this::averageQueueLength);
    }

    private double averageQueueLength() {
        double totalLength =
            queues
                .stream()
                .mapToDouble(LinkedBlockingQueue::size)
                .sum();
        return totalLength / queues.size();
    }

    //...

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;If you are paranoid you can even create one metric per each queue.&lt;/p&gt;

&lt;h1&gt;
  
  
  Deduplication and idempotency
&lt;/h1&gt;

&lt;p&gt;In distributed environment it's quite common to receive duplicated events when your producer has &lt;em&gt;at least once&lt;/em&gt; guarantees. The reasons behind such behavior are beyond the scope of this article but we must learn how to live with that issue. One way is to attach globally unique identifier (&lt;code&gt;UUID&lt;/code&gt;) to every message and make sure on the consumer side that messages with the same identifier aren't processed twice. Each &lt;code&gt;Event&lt;/code&gt; has such &lt;code&gt;UUID&lt;/code&gt;. The most straightforward solution under our requirements is to simply store all seen &lt;code&gt;UUID&lt;/code&gt;s and verify on arrival that received &lt;code&gt;UUID&lt;/code&gt; was never seen before. Using &lt;code&gt;ConcurrentHashMap&amp;lt;UUID, UUID&amp;gt;&lt;/code&gt; (there is no &lt;code&gt;ConcurrentHashSet&lt;/code&gt; in JDK) as-is will lead to memory leak as we will keep accumulating more and more IDs over time. That's why we only look for duplicates in the last 10 seconds. You can technically have &lt;code&gt;ConcurrentHashMap&amp;lt;UUID, Instant&amp;gt;&lt;/code&gt; that maps from &lt;code&gt;UUID&lt;/code&gt; to timestamp when it was encountered. By using a background thread we can then remove elements older than 10 seconds. But if you are a happy Guava user, &lt;code&gt;Cache&amp;lt;UUID, UUID&amp;gt;&lt;/code&gt; with declarative eviction policy will do the trick:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;

    private Cache&amp;lt;UUID, UUID&amp;gt; seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream) {
        this.downstream = downstream;
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            return event;
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Once again to be safe on production there are at least two metrics I can think of that might become useful: cache size and number of duplicates discovered. Let's plug-in these metrics as well:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class IgnoreDuplicates implements EventConsumer {

    private final EventConsumer downstream;
    private final Meter duplicates;

    private Cache&amp;lt;UUID, UUID&amp;gt; seenUuids = CacheBuilder.newBuilder()
            .expireAfterWrite(10, TimeUnit.SECONDS)
            .build();

    IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {
        this.downstream = downstream;
        duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));
        metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge&amp;lt;Long&amp;gt;) seenUuids::size);
    }

    @Override
    public Event consume(Event event) {
        final UUID uuid = event.getUuid();
        if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {
            return downstream.consume(event);
        } else {
            duplicates.mark();
            return event;
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Finally we have all the pieces to build our solution. The idea is to compose pipeline from &lt;code&gt;EventConsumer&lt;/code&gt; instances wrapping each other:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;First we apply &lt;code&gt;IgnoreDuplicates&lt;/code&gt; to reject duplicates&lt;/li&gt;
&lt;li&gt;Then we call &lt;code&gt;SmartPool&lt;/code&gt; that always pins given &lt;code&gt;clientId&lt;/code&gt; to the same thread and executes next stage in that thread&lt;/li&gt;
&lt;li&gt;Finally &lt;code&gt;ClientProjection&lt;/code&gt; is invoked that does the real business logic.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;You can optionally place &lt;code&gt;FailOnConcurrentModification&lt;/code&gt; step between &lt;code&gt;SmartPool&lt;/code&gt; and &lt;code&gt;ClientProjection&lt;/code&gt; for extra safety (concurrent modification shouldn't happen by design):&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ClientProjection clientProjection =
        new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification =
        new FailOnConcurrentModification(clientProjection);
SmartPool smartPool =
        new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates =
        new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es = new EventStream();
es.consume(withoutDuplicates);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;It took us a lot of work to come up with relatively simple and well structured (I hope you agree) solution. In the end the best way to tackle concurrency issues is to... avoid concurrency and run code that is subject to race conditions in one thread. This is also the idea behind Akka actors (single message processed per actor) and RxJava (one message processed by &lt;code&gt;Subscriber&lt;/code&gt;).&lt;/p&gt;

&lt;p&gt;What we came up so far was a combination of thread pools and shared cache. This time we will implement the solution using RxJava. First of all I never revealed how &lt;code&gt;EventStream&lt;/code&gt; is implemented, only giving the API:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;interface EventStream {

    void consume(EventConsumer consumer);

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;In fact for manual testing I built a simple RxJava stream that behaves like the system from the requirements:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@Slf4j
class EventStream {

    void consume(EventConsumer consumer) {
        observe()
            .subscribe(
                consumer::consume,
                e -&amp;gt; log.error("Error emitting event", e)
        );
    }

    Observable&amp;lt;Event&amp;gt; observe() {
        return Observable
                .interval(1, TimeUnit.MILLISECONDS)
                .delay(x -&amp;gt; Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS))
                .map(x -&amp;gt; new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID()))
                .flatMap(this::occasionallyDuplicate, 100)
                .observeOn(Schedulers.io());
    }

    private Observable&amp;lt;Event&amp;gt; occasionallyDuplicate(Event x) {
        final Observable&amp;lt;Event&amp;gt; event = Observable.just(x);
        if (Math.random() &amp;gt;= 0.01) {
            return event;
        }
        final Observable&amp;lt;Event&amp;gt; duplicated =
                event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);
        return event.concatWith(duplicated);
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Understanding how this simulator works is not essential, but quite interesting. First we generate steady stream of &lt;code&gt;Long&lt;/code&gt; values (&lt;code&gt;0&lt;/code&gt;, &lt;code&gt;1&lt;/code&gt;, &lt;code&gt;2&lt;/code&gt;...) every millisecond (thousand events per second) using &lt;code&gt;interval()&lt;/code&gt; operator. Then we delay each event by random amount of time between &lt;code&gt;0&lt;/code&gt; and &lt;code&gt;1_000&lt;/code&gt; microseconds with &lt;code&gt;delay()&lt;/code&gt; operator. This way events will appears in less predictable moments in time, a bit more realistic situation. Finally we map (using, ekhem, &lt;code&gt;map()&lt;/code&gt; operator) each &lt;code&gt;Long&lt;/code&gt; value to a random &lt;code&gt;Event&lt;/code&gt; with &lt;code&gt;clientId&lt;/code&gt; somewhere between &lt;code&gt;1_000&lt;/code&gt; and &lt;code&gt;1_100&lt;/code&gt; (inclusive-exclusive).&lt;/p&gt;

&lt;p&gt;The last bit is interesting. We would like to simulate occasional duplicates. In order to do so we map every event (using &lt;code&gt;flatMap()&lt;/code&gt;) to itself (in 99% of the cases). However in 1% of the cases we return this event twice, where the second occurrence happens between 10 milliseconds and 5 seconds later. In practice the duplicated instance of the event will appear after hundreds of other events, which makes the stream behave really realistically.&lt;/p&gt;

&lt;p&gt;There are two ways to interact with the &lt;code&gt;EventStream&lt;/code&gt; - callback based via &lt;code&gt;consume()&lt;/code&gt; and stream based via &lt;code&gt;observe()&lt;/code&gt;. We can take advantage of &lt;code&gt;Observable&amp;lt;Event&amp;gt;&lt;/code&gt; to quickly build processing pipeline very similar in functionality to &lt;em&gt;part 1&lt;/em&gt; but much simpler.&lt;/p&gt;

&lt;h1&gt;
  
  
  Missing backpressure
&lt;/h1&gt;

&lt;p&gt;The first naive approach to take advantage of RxJava falls short very quickly:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;EventStream es = new EventStream();
EventConsumer clientProjection = new ClientProjection(
        new ProjectionMetrics(
                new MetricRegistry()));

es.observe()
        .subscribe(
                clientProjection::consume,
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;(&lt;code&gt;ClientProjection&lt;/code&gt;, &lt;code&gt;ProjectionMetrics&lt;/code&gt; et. al. come from &lt;a href="http://www.nurkiewicz.com/2016/10/small-scale-stream-processing-kata-part.html"&gt;part 1&lt;/a&gt;). We get &lt;code&gt;MissingBackpressureException&lt;/code&gt; almost instantaneously and that was expected. Remember how our first solution was lagging by handling events with more and more latency?&lt;br&gt;
RxJava tries to avoid that, as well as avoiding overflow of queues.&lt;br&gt;
&lt;code&gt;MissingBackpressureException&lt;/code&gt; is thrown because consumer (&lt;code&gt;ClientProjection&lt;/code&gt;) is incapable of handling events in real time. This is &lt;em&gt;fail-fast&lt;/em&gt; behavior. The quickest solution is to move consumption to a separate thread pool, just like before, but using RxJava's facilities:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;EventStream es = new EventStream();
EventConsumer clientProjection = new FailOnConcurrentModification(
        new ClientProjection(
                new ProjectionMetrics(
                        new MetricRegistry())));

es.observe()
        .flatMap(e -&amp;gt; clientProjection.consume(e, Schedulers.io()))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -&amp;gt; log.info("Processed {} events/s", c),
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;code&gt;EventConsumer&lt;/code&gt; interface has a helper method that can consume events asynchronously on a supplied &lt;code&gt;Scheduler&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@FunctionalInterface
interface EventConsumer {
    Event consume(Event event);

    default Observable&amp;lt;Event&amp;gt; consume(Event event, Scheduler scheduler) {
        return Observable
                .fromCallable(() -&amp;gt; this.consume(event))
                .subscribeOn(scheduler);
    }

}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;By consuming events using &lt;code&gt;flatMap()&lt;/code&gt; in a separate &lt;code&gt;Scheduler.io()&lt;/code&gt; each consumption is invoked asynchronously. This time events are processed near real-time, but there is a bigger problem. I decorated &lt;code&gt;ClientProjection&lt;/code&gt; with &lt;code&gt;FailOnConcurrentModification&lt;/code&gt; for a reason. Events are consumed independently from each other so it may happen that two events for the same &lt;code&gt;clientId&lt;/code&gt; are processed concurrently. Not good. Luckily in RxJava solving this problem is much easier than with plain threads:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;es.observe()
        .groupBy(Event::getClientId)
        .flatMap(byClient -&amp;gt; byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume))
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -&amp;gt; log.info("Processed {} events/s", c),
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A little bit has changed. First of all we group events by &lt;code&gt;clientId&lt;/code&gt;. This splits single &lt;code&gt;Observable&lt;/code&gt; stream into &lt;em&gt;stream of streams&lt;/em&gt;. Each substream named &lt;code&gt;byClient&lt;/code&gt; represents all events related to the same &lt;code&gt;clientId&lt;/code&gt;. Now if we map over this substream we can be sure that events related to the same &lt;code&gt;clientId&lt;/code&gt; are never processed concurrently. The outer stream is lazy so we must subscribe to it. Rather than subscribing to every event separately we collect events every second and count them. This way we receive a single event of type &lt;code&gt;Integer&lt;/code&gt; every second representing the number of events consumed per second.&lt;/p&gt;

&lt;h1&gt;
  
  
  Impure, non-idiomatic, error-prone, unsafe solution of deduplication using global state
&lt;/h1&gt;

&lt;p&gt;Now we must drop duplicate &lt;code&gt;UUID&lt;/code&gt;s. The simplest, yet very foolish way of discarding duplicates is by taking advantage of global state. We can simply filter out duplicates by looking them up in cache available outside of &lt;code&gt;filter()&lt;/code&gt; operator:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;final Cache&amp;lt;UUID, UUID&amp;gt; seenUuids = CacheBuilder.newBuilder()
        .expireAfterWrite(10, TimeUnit.SECONDS)
        .build();

es.observe()
        .filter(e -&amp;gt; seenUuids.getIfPresent(e.getUuid()) == null)
        .doOnNext(e -&amp;gt; seenUuids.put(e.getUuid(), e.getUuid()))
        .subscribe(
                clientProjection::consume,
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;If you want to monitor the usage of this mechanism simply add metric:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Meter duplicates = metricRegistry.meter("duplicates");

es.observe()
        .filter(e -&amp;gt; {
            if (seenUuids.getIfPresent(e.getUuid()) != null) {
                duplicates.mark();
                return false;
            } else {
                return true;
            }
        })
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Accessing global, especially mutable state from inside of operators is very dangerous and undermines the sole purposes of RxJava - simplifying concurrency. Obviously we use thread-safe &lt;code&gt;Cache&lt;/code&gt; from Guava, but in many cases it's easy to miss places where shared global mutable state is accessed from multiple threads. If you find yourself mutating some variable outside of the operator chain, be very careful.&lt;/p&gt;

&lt;h1&gt;
  
  
  Custom &lt;code&gt;distinct()&lt;/code&gt; operator in RxJava 1.x
&lt;/h1&gt;

&lt;p&gt;RxJava 1.x has a &lt;code&gt;distinct()&lt;/code&gt; operator that presumably does the job:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;es.observe()
        .distinct(Event::getUuid)
        .groupBy(Event::getClientId)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Unfortunately &lt;code&gt;distinct()&lt;/code&gt; stores all keys (&lt;code&gt;UUID&lt;/code&gt;s) internally in ever-growing &lt;code&gt;HashSet&lt;/code&gt;. But we only care about duplicates in last 10 seconds!&lt;br&gt;
By copy-pasting the implementation of &lt;code&gt;DistinctOperator&lt;/code&gt; I created &lt;code&gt;DistinctEvent&lt;/code&gt; operator that takes advantage of Guava's cache to only store last 10 seconds worth of UUID's. I intentionally hard-coded &lt;code&gt;Event&lt;/code&gt; in this operator rather than making it more generic to keep code easier to understand:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;class DistinctEvent implements Observable.Operator&amp;lt;Event, Event&amp;gt; {
    private final Duration duration;

    DistinctEvent(Duration duration) {
        this.duration = duration;
    }

    @Override
    public Subscriber&amp;lt;? super Event&amp;gt; call(Subscriber&amp;lt;? super Event&amp;gt; child) {
        return new Subscriber&amp;lt;Event&amp;gt;(child) {
            final Map&amp;lt;UUID, Boolean&amp;gt; keyMemory = CacheBuilder.newBuilder()
                    .expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
                    .&amp;lt;UUID, Boolean&amp;gt;build().asMap();

            @Override
            public void onNext(Event event) {
                if (keyMemory.put(event.getUuid(), true) == null) {
                    child.onNext(event);
                } else {
                    request(1);
                }
            }

            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }

            @Override
            public void onCompleted() {
                child.onCompleted();
            }

        };
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The usage is fairly simple and the whole implementation (plus the custom operator) is as short as:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -&amp;gt; byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .window(1, TimeUnit.SECONDS)
        .flatMap(Observable::count)
        .subscribe(
                c -&amp;gt; log.info("Processed {} events/s", c),
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Actually it can be even shorter if you skip logging every second:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;es.observe()
        .lift(new DistinctEvent(Duration.ofSeconds(10)))
        .groupBy(Event::getClientId)
        .flatMap(byClient -&amp;gt; byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -&amp;gt; {},
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This solution is much shorter than previous one based on thread pools and decorators. The only awkward part is custom operator that avoid memory leak when storing too many historic &lt;code&gt;UUID&lt;/code&gt;s. Luckily RxJava 2 to the rescue!&lt;/p&gt;

&lt;h1&gt;
  
  
  RxJava 2.x and more powerful built-in &lt;code&gt;distinct()&lt;/code&gt;
&lt;/h1&gt;

&lt;p&gt;I was actually &lt;em&gt;this&lt;/em&gt; close from submitting a PR to RxJava with more powerful implementation of &lt;code&gt;distinct()&lt;/code&gt; operator. But before I checked &lt;code&gt;2.x&lt;/code&gt; branch and there it was: &lt;code&gt;distinct()&lt;/code&gt; that allows providing custom &lt;code&gt;Collection&lt;/code&gt; as opposed to hard-coded &lt;code&gt;HashSet&lt;/code&gt;. Believe it or not, dependency inversion is not only about Spring framework or Java EE. When a library allows you to provide custom implementation of its internal data structure, this is also DI. First I create a helper method that can build &lt;code&gt;Set&amp;lt;UUID&amp;gt;&lt;/code&gt; backed by &lt;code&gt;Map&amp;lt;UUID, Boolean&amp;gt;&lt;/code&gt; backed by &lt;code&gt;Cache&amp;lt;UUID, Boolean&amp;gt;&lt;/code&gt;. We sure like delegation!&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private Set&amp;lt;UUID&amp;gt; recentUuids() {
    return Collections.newSetFromMap(
            CacheBuilder.newBuilder()
                    .expireAfterWrite(10, TimeUnit.SECONDS)
                    .&amp;lt;UUID, Boolean&amp;gt;build()
                    .asMap()
    );
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Having this method we can implement the whole task using this expression:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;es.observe()
        .distinct(Event::getUuid, this::recentUuids)
        .groupBy(Event::getClientId)
        .flatMap(byClient -&amp;gt; byClient
                .observeOn(Schedulers.io())
                .map(clientProjection::consume)
        )
        .subscribe(
                e -&amp;gt; {},
                e -&amp;gt; log.error("Fatal error", e)
        );
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The elegance, the simplicity, the clarity!&lt;br&gt;
It reads almost like a problem:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;observe a stream of events&lt;/li&gt;
&lt;li&gt;take only distinct UUIDs into account&lt;/li&gt;
&lt;li&gt;group events by client&lt;/li&gt;
&lt;li&gt;for each client consume them (sequentially)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Hope you enjoyed all these solutions and you find them useful in your daily work.&lt;/p&gt;

</description>
      <category>java</category>
      <category>rxjava</category>
      <category>concurrency</category>
    </item>
  </channel>
</rss>
