<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: George Oliveira</title>
    <description>The latest articles on DEV Community by George Oliveira (@foolonthehill).</description>
    <link>https://dev.to/foolonthehill</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F495976%2F217a34c1-37f2-4496-b481-7e3f03cb1a49.jpeg</url>
      <title>DEV Community: George Oliveira</title>
      <link>https://dev.to/foolonthehill</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/foolonthehill"/>
    <language>en</language>
    <item>
      <title>Build a event-driven app with Micronaut, Kafka and Debezium</title>
      <dc:creator>George Oliveira</dc:creator>
      <pubDate>Sat, 24 Jul 2021 16:50:03 +0000</pubDate>
      <link>https://dev.to/foolonthehill/build-a-event-driven-app-with-micronaut-kafka-and-debezium-11be</link>
      <guid>https://dev.to/foolonthehill/build-a-event-driven-app-with-micronaut-kafka-and-debezium-11be</guid>
      <description>&lt;h2&gt;
  
  
  Intro
&lt;/h2&gt;

&lt;p&gt;In this article we'll be building a &lt;a href="https://medium.com/disney-streaming/event-driven-architecture-use-cases-9c123bbfe32f" rel="noopener noreferrer"&gt;event-driven app&lt;/a&gt; in Java using the Micronaut framework.&lt;/p&gt;

&lt;h2&gt;
  
  
  A quick word on event-driven architectures
&lt;/h2&gt;

&lt;p&gt;For the purposes of this article, it's fine to understand that:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;an &lt;strong&gt;event&lt;/strong&gt; is a representation of a change in an state in a component, e.g.: user updated his name, new message saved at the database, the same as "user sent a new message", which is the same as "new message sent from user A to user B", etc.&lt;/li&gt;
&lt;li&gt;a event-driven architecture is an architecture based on reaction to events.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In this kind of architecture we have 3 key actors:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Producers: the ones that create the events or, in other words, sends messages informing that something happened; these messages may contain a payload with relevant information for processing the event;&lt;/li&gt;
&lt;li&gt;Brokers: delivers the messages from producers to consumers&lt;/li&gt;
&lt;li&gt;Consumers: the ones that process (react to) the messages to perform a specific task.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Please note that a single service / app, may have different kinds of actors, e.g.: a chat backend app that consume new messages events and produces unread messages notification events.&lt;/p&gt;

&lt;p&gt;The nature of these architectures makes them ideal to be employed for async processing and/or at apps with high load.&lt;/p&gt;

&lt;p&gt;There are many resources that address every details and aspects of event-driven architectures, it's use cases, pros, cons, etc. I'll find them they &lt;a href="https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/" rel="noopener noreferrer"&gt;linked&lt;/a&gt; among the article.&lt;/p&gt;

&lt;h2&gt;
  
  
  A real world example app
&lt;/h2&gt;

&lt;p&gt;I was trying to come up with a "real-world example app" of an event-driven architecture that covered the basic aspects of such architectures and was simple enough to implement. It was when I remembered one example that Design Data-Intensive Applications uses to describe the concept of "load" on a system: the "fan-out" of tweets at Twitter. &lt;/p&gt;

&lt;p&gt;I saw it as the example I was looking for. Maybe not as simple as I wanted to, but that's ok... I guess, please tell me later hahaha.&lt;/p&gt;

&lt;p&gt;Anyways, what is this "fan-out" thing? Twitter has two main operations:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Post a tweet: a user publishes a new message to it's followers (4.6 k requests/sec on average, over 12 k requests/sec at peak)&lt;/li&gt;
&lt;li&gt;Home timeline: a user views tweets published recently by users it follows (300 k
requests/sec)&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Twitter's scaling problems are not due to tweet posts, since it's ok to handle 12k write requests, but because of the "fan-out" to deliver it's tweets to users timelines, since each user follows many people and each user is followed by many people.&lt;/p&gt;

&lt;p&gt;Fan-out is a term borrowed from electronic engineering, where it describes the number of logic gate inputs that are attached to another gate’s output. The output needs to supply enough current to drive all the attached inputs. In transaction processing systems, we use it to describe the number of requests to other services that we need to make in order to serve one incoming request.&lt;/p&gt;

&lt;p&gt;Consider the following relational schema:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3tq8s6t4e29jfvaqts74.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3tq8s6t4e29jfvaqts74.png" alt="Tweets relational schema"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Probably the simplest approach to implement these operations would be to&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Post a tweet: simply insert the new tweet into the global tweets table.&lt;/li&gt;
&lt;li&gt;Home timeline: find all people the user follows and find their recent tweets. Something like:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;users&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;users&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;sender_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;users&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;follows&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;follows&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;followee_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;users&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;
&lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;follows&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;follower_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;CURRENT_USER&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;tweets&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nb"&gt;timestamp&lt;/span&gt;
&lt;span class="k"&gt;LIMIT&lt;/span&gt; &lt;span class="n"&gt;TIMELINE_SIZE&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Although simple, this approach is not as fast as we would like, given the fan-out stuff.&lt;/p&gt;

&lt;p&gt;Another approach (and the one that &lt;a href="https://www.infoq.com/presentations/Twitter-Timeline-Scalability/" rel="noopener noreferrer"&gt;Twitter switched to after struggling to keep up with timeline loads&lt;/a&gt;, btw) is to keep, for each user, a cache for it's home timeline:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuoj3kekli83sqmja8dvn.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuoj3kekli83sqmja8dvn.png" alt="Twitter's fan-out approach to deliver timeline tweets"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Post a tweet: find all people that follows the tweet author and insert the new tweet at their home timeline caches.&lt;/li&gt;
&lt;li&gt;Home timeline: just read from the cache.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The second approach works because we have almost twice timelines reads as we have tweet posts, so it's ok to make an extra work when publishing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture for what we'll build
&lt;/h2&gt;

&lt;p&gt;We'll build a back-end system that implements the second approach.&lt;/p&gt;

&lt;p&gt;This system should allow Users to (1) post tweets and (2) retrieve their home timelines. For simplicity, we'll not handle user creation, nor authentication / authorization, assuming that another component / service handles it so that we can focus on (1) and (2).&lt;/p&gt;

&lt;p&gt;The diagram below illustrates the components of our system and how the data flows between them:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy8nt7md5ae4dz9uilu66.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fy8nt7md5ae4dz9uilu66.png" alt="Components from our app and how they communicate with each other"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;the Tweets API allows our users to post / edit / delete tweets&lt;/li&gt;
&lt;li&gt;our database will be a PostgreSQL instance&lt;/li&gt;
&lt;li&gt;we'll use Debezium for change data capture with a Kafka Connect to publish tweet events to Kafka, which we'll use as a broker.&lt;/li&gt;
&lt;li&gt;we'll do so by using the "outbox pattern"&lt;/li&gt;
&lt;li&gt;the Tweets Delivery Worker, that will update user's home timeline caches upon new tweet events consumed from Kafka&lt;/li&gt;
&lt;li&gt;the Timeline Cache may simply be a in-memory cache, such as a Redis instance, and timeline requests are made to Tweets API, which, then, reads the cache to retrieve timeline tweets data and return it back to the user&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Complete project
&lt;/h2&gt;

&lt;p&gt;You'll find the complete code available &lt;a href="https://github.com/foolOnTheHill/tweets" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;We'll use the Micronaut framework. &lt;/p&gt;

&lt;h2&gt;
  
  
  Setup
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Dependencies
&lt;/h3&gt;

&lt;p&gt;To setup locally our external dependencies (like Postgres, Debezium, Kafka and Redis)  we'll use Docker Compose.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;version&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2.2'&lt;/span&gt;
&lt;span class="na"&gt;services&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;postgres&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;postgres&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;postgres:11.2-alpine'&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;5432:5432&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;POSTGRES_DB=tweets&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;POSTGRES_PASSWORD=pass&lt;/span&gt;
    &lt;span class="na"&gt;healthcheck&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;test&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;CMD-SHELL"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;pg_isready&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;-U&lt;/span&gt;&lt;span class="nv"&gt; &lt;/span&gt;&lt;span class="s"&gt;postgres"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
      &lt;span class="na"&gt;interval&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;5s&lt;/span&gt;
      &lt;span class="na"&gt;timeout&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;3s&lt;/span&gt;
      &lt;span class="na"&gt;retries&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;7&lt;/span&gt;
  &lt;span class="na"&gt;redis&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redis&lt;/span&gt;
      &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;redis:6.0-alpine'&lt;/span&gt;
      &lt;span class="na"&gt;hostname&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redis&lt;/span&gt;
      &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;6379:6379'&lt;/span&gt;
  &lt;span class="na"&gt;zookeeper&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;confluentinc/cp-zookeeper:4.0.3'&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;2181:2181"&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;ZOOKEEPER_CLIENT_PORT=2181&lt;/span&gt;
  &lt;span class="na"&gt;kafka&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;confluentinc/cp-kafka:4.0.3'&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;9092:9092"&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ADVERTISED_LISTENERS&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;PLAINTEXT://kafka:9092&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_ZOOKEEPER_CONNECT&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;zookeeper:2181"&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_BROKER_ID&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
      &lt;span class="na"&gt;KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;zookeeper&lt;/span&gt;
  &lt;span class="na"&gt;schema-registry&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;schema-registry&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;confluentinc/cp-schema-registry:4.0.3'&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;SCHEMA_REGISTRY_HOST_NAME=schema-registry&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;8081:8081"&lt;/span&gt;
  &lt;span class="na"&gt;wait-for-dependencies&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;dadarek/wait-for-dependencies&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;wait-for-dependencies&lt;/span&gt;
    &lt;span class="na"&gt;scale&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;redis:6379 schema-registry:8081 kafka:9092 zookeeper:2181 postgres:5432&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Database setup
&lt;/h2&gt;

&lt;p&gt;I like using &lt;a href="https://dbdiagram.io/" rel="noopener noreferrer"&gt;https://dbdiagram.io&lt;/a&gt; to create my DB schema diagrams. It's quick and simple and has options to export the schema to SQL. 🆒&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/Build%2520a%2520event-driven%2520app%2520with%2520Micronaut%2C%2520Kafka%2520and%2520f293f4cef94e4b5780e0e04267344063%2FUntitled.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/Build%2520a%2520event-driven%2520app%2520with%2520Micronaut%2C%2520Kafka%2520and%2520f293f4cef94e4b5780e0e04267344063%2FUntitled.png" alt="Build%20a%20event-driven%20app%20with%20Micronaut,%20Kafka%20and%20f293f4cef94e4b5780e0e04267344063/Untitled.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This schema is a huge simplification of what a "real-world app" would use, but have just what we need to implement our system and to exemplify a few concepts.&lt;/p&gt;

&lt;p&gt;We have the Users table, along with the Follows relationship table, that records the users that a user follows, as the name suggests, and the global Tweets table (another suggestive name here :P).&lt;/p&gt;

&lt;h3&gt;
  
  
  Migrations
&lt;/h3&gt;

&lt;p&gt;Any changes to the database are called &lt;em&gt;migrations.&lt;/em&gt; We keep these changes into versioned files so that we can make use of all goods version control gives us: keep track of how the db schema changed over time, allowing us to rebuild our database from scratch, making clear the current state of our database and easier to undo things when we need to (rollbacks), etc.&lt;/p&gt;

&lt;p&gt;These concepts were borrowed from &lt;a href="https://flywaydb.org/documentation/getstarted/why" rel="noopener noreferrer"&gt;Flyway&lt;/a&gt;, the tool we'll use to manage our migrations, and you can read more about them &lt;a href="https://flywaydb.org/documentation/concepts/migrations" rel="noopener noreferrer"&gt;here&lt;/a&gt;, with illustrated examples and all.&lt;/p&gt;

&lt;p&gt;Here's the SQL file exported from dbdiagram and that we'll use as our &lt;code&gt;V1__Init.sql&lt;/code&gt; migration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"users"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nv"&gt;"id"&lt;/span&gt; &lt;span class="n"&gt;BIGSERIAL&lt;/span&gt; &lt;span class="k"&gt;UNIQUE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"username"&lt;/span&gt; &lt;span class="nb"&gt;varchar&lt;/span&gt; &lt;span class="k"&gt;UNIQUE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;"username"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"follows"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nv"&gt;"follower_id"&lt;/span&gt; &lt;span class="nb"&gt;bigint&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"followee_id"&lt;/span&gt; &lt;span class="nb"&gt;bigint&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"follower_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nv"&gt;"followee_id"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"tweets"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="nv"&gt;"id"&lt;/span&gt; &lt;span class="nb"&gt;SERIAL&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"sender_id"&lt;/span&gt; &lt;span class="nb"&gt;bigint&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"text"&lt;/span&gt; &lt;span class="nb"&gt;text&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="nv"&gt;"timestamp"&lt;/span&gt; &lt;span class="nb"&gt;timestamp&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"follows"&lt;/span&gt; &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;FOREIGN&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"follower_id"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;REFERENCES&lt;/span&gt; &lt;span class="nv"&gt;"users"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"follows"&lt;/span&gt; &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;FOREIGN&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"followee_id"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;REFERENCES&lt;/span&gt; &lt;span class="nv"&gt;"users"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="nv"&gt;"tweets"&lt;/span&gt; &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;FOREIGN&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"sender_id"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;REFERENCES&lt;/span&gt; &lt;span class="nv"&gt;"users"&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Flyway setup
&lt;/h4&gt;

&lt;p&gt;We have the option to run Flyway from within our app container, but we won't do it. Instead, we'll run flyway from it's own container. We'll do this to avoid concurrency issues: imagine we have dozens of instances of our app starting up, each one running flyway to validate and update the migrations, feel free to imagine the chaos created in case of issues while migrating as well.&lt;/p&gt;

&lt;p&gt;Doing so is simple and we'll do using the &lt;a href="https://hub.docker.com/r/flyway/flyway" rel="noopener noreferrer"&gt;Flyway Docker image&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Place the migration file at &lt;code&gt;postgres/migrations&lt;/code&gt; and add the migration container setup to the &lt;code&gt;docker-compose.yml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;  &lt;span class="na"&gt;migration&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;container_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;migration&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;flyway/flyway:6.0.2-alpine'&lt;/span&gt;
    &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;-url=jdbc:postgresql://postgres:5432/tweets -user=postgres -password=pass migrate&lt;/span&gt;
    &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./postgres/migrations:/flyway/sql:Z&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="na"&gt;postgres&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
        &lt;span class="na"&gt;condition&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;service_healthy&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we can connect to the local postgres instance and check that the tables were created:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;psql &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-d&lt;/span&gt; tweets &lt;span class="nt"&gt;-U&lt;/span&gt; postgres                   
Password &lt;span class="k"&gt;for &lt;/span&gt;user postgres: 
psql &lt;span class="o"&gt;(&lt;/span&gt;12.6, server 11.2&lt;span class="o"&gt;)&lt;/span&gt;
Type &lt;span class="s2"&gt;"help"&lt;/span&gt; &lt;span class="k"&gt;for &lt;/span&gt;help.

&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# \dt&lt;/span&gt;
                 List of relations
 Schema |         Name          | Type  |  Owner   
&lt;span class="nt"&gt;--------&lt;/span&gt;+-----------------------+-------+----------
 public | flyway_schema_history | table | postgres
 public | follows               | table | postgres
 public | tweets                | table | postgres
 public | &lt;span class="nb"&gt;users&lt;/span&gt;                 | table | postgres
&lt;span class="o"&gt;(&lt;/span&gt;4 rows&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Creating tweets
&lt;/h2&gt;

&lt;p&gt;The &lt;a href="https://docs.micronaut.io/2.2.0/guide/index.html" rel="noopener noreferrer"&gt;Micronaut guide&lt;/a&gt; is very detailed and helped me a lot while developing this example API. It has lots of examples with good coverage for everyday use of the framework.&lt;/p&gt;

&lt;h3&gt;
  
  
  Data Access Layer (DAL)
&lt;/h3&gt;

&lt;p&gt;This layer will be in charge of creating an abstraction for manipulating the data at the database or any other persistence mechanism. We'll do that by creating some components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Models: that create an abstraction for the entities being manipulated and it's relationships. We'll use Hibernate to create these abstractions.&lt;/li&gt;
&lt;li&gt;Data Transfer Objects (DTOs): classes that encapsulate data that will transfer between layers (I know that's not the &lt;a href="https://martinfowler.com/eaaCatalog/dataTransferObject.html" rel="noopener noreferrer"&gt;Martin Fowler's definition&lt;/a&gt;...)&lt;/li&gt;
&lt;li&gt;Data Access Objects (DAOs): these are abstractions to the persistence mechanism. This will expose data operations without exposing details of the database. So we'll be able to update and retrieve data easily. We'll use JPA to provide these operations.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Models
&lt;/h4&gt;

&lt;p&gt;Here, we used Lombok and Java Persistence annotations to create our Tweet model, with a default constructor, getters, setters and especifying the database schema, table and it's fields.&lt;/p&gt;

&lt;p&gt;We also used the &lt;code&gt;@EqualsAndHashCode&lt;/code&gt; with &lt;code&gt;@EqualsAndHashCode.Exclude&lt;/code&gt; at the &lt;code&gt;id&lt;/code&gt; to make our tests easier when comparing models, since the &lt;code&gt;id&lt;/code&gt; field is generated automatically.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.models&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.sql.Timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Entity&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.GeneratedValue&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.GenerationType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Table&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.AccessLevel&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Getter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.RequiredArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Setter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.experimental.FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Entity&lt;/span&gt;
&lt;span class="nd"&gt;@Table&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"public"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"tweets"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@Getter&lt;/span&gt;
&lt;span class="nd"&gt;@Setter&lt;/span&gt;
&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt;
&lt;span class="nd"&gt;@FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AccessLevel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;PRIVATE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Id&lt;/span&gt;
  &lt;span class="nd"&gt;@GeneratedValue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strategy&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GenerationType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;IDENTITY&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Exclude&lt;/span&gt;
  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;text&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nc"&gt;Timestamp&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The best resource about Hibernate available is &lt;a href="https://vladmihalcea.com/postgresql-serial-column-hibernate-identity/" rel="noopener noreferrer"&gt;this&lt;/a&gt; blog.&lt;/p&gt;

&lt;h4&gt;
  
  
  DTO (Data Transfer Object)
&lt;/h4&gt;

&lt;p&gt;We'll use Lombok to create our Tweet DTO with a builder:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.LocalDateTime&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Builder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Value&lt;/span&gt;
&lt;span class="nd"&gt;@Builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;toBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;text&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Mappers
&lt;/h4&gt;

&lt;p&gt;Mappers are components that make conversions between classes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.mappers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.models.Tweet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.sql.Timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetMapper&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="nf"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSenderId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;text&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getText&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestamp&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toInstant&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;atOffset&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toLocalDateTime&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="nf"&gt;fromDto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;tweet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Tweet&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setSenderId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSenderId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setText&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getText&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setTimestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Timestamp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;from&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestamp&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toInstant&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;)));&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  DAO (Data Access Object)
&lt;/h4&gt;

&lt;p&gt;Here we'll use the JPA annotations and extend the &lt;code&gt;JpaRepository&lt;/code&gt; interface for our model:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal.dao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.models.Tweet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.annotation.Repository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.jpa.repository.JpaRepository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Repository&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;TweetsDao&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;JpaRepository&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Tweet&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To add more database access abstractions we change this DAO file, for example: let's say we want to add a method to retrieve all tweets by a particular sender, we could add a method signature:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Tweet&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;findAllBySenderId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;More details from possibilities available at Micronaut Data are described &lt;a href="https://micronaut-projects.github.io/micronaut-data/latest/guide/" rel="noopener noreferrer"&gt;here&lt;/a&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  DAL
&lt;/h4&gt;

&lt;p&gt;In our DAL abstraction, we provide a method to persist a tweet at our database.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal.dao.TweetsDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.models.Tweet&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.transaction.Transactional&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsDal&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsDao&lt;/span&gt; &lt;span class="n"&gt;tweetsDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Transactional&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="nf"&gt;persistTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;tweet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TweetMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromDto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;persistedTweet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tweetsDao&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;save&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TweetMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;persistedTweet&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Transactions ensures that our methods executions are atomic. The &lt;a href="https://www.devmedia.com.br/conheca-o-spring-transactional-annotations/32472" rel="noopener noreferrer"&gt;classic example&lt;/a&gt; is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="nc"&gt;UserTransaction&lt;/span&gt; &lt;span class="n"&gt;utx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;entityManager&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTransaction&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

&lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;utx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;begin&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="n"&gt;businessLogic&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="n"&gt;utx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;commit&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;utx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rollback&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;begin()&lt;/code&gt; call starts a transaction, and everything fro now on is considered atomic. When the &lt;code&gt;commit()&lt;/code&gt; happens, then the information is persisted and the transaction is finished. If any error happens inside &lt;code&gt;businessLogic()&lt;/code&gt; , the &lt;code&gt;catc()&lt;/code&gt; flow is triggered and then a rollback happens, ensuring nothing is persisted and so the transaction also finishes.&lt;/p&gt;

&lt;p&gt;We used &lt;code&gt;@Transactional&lt;/code&gt; so that we can benefit from the &lt;a href="https://docs.micronaut.io/2.2.0/guide/index.html#springAop" rel="noopener noreferrer"&gt;Micronaut's &lt;code&gt;HibernateTransactionManager&lt;/code&gt;&lt;/a&gt; that &lt;a href="https://dzone.com/articles/how-does-spring-transactional" rel="noopener noreferrer"&gt;handles transactions management&lt;/a&gt; for us. &lt;/p&gt;

&lt;h2&gt;
  
  
  The API
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Service
&lt;/h3&gt;

&lt;p&gt;Our Tweets Service instance uses the DAL to persist a tweet constructed using the data from the POST request body:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.services&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.mappers.TweetRequestMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal.TweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsDal&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="nf"&gt;publishTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;PostTweetRequestDto&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TweetRequestMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromPostRequest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;persistedTweetDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;persistTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;persistedTweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This body has the format of:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.dtos&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.core.annotation.Introspected&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.validation.constraints.NotBlank&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.validation.constraints.NotNull&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.validation.constraints.Positive&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Builder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Value&lt;/span&gt;
&lt;span class="nd"&gt;@Builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;toBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@Introspected&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;PostTweetRequestDto&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@NotNull&lt;/span&gt; &lt;span class="nd"&gt;@NotBlank&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nd"&gt;@NotNull&lt;/span&gt; &lt;span class="nd"&gt;@NotBlank&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;text&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nd"&gt;@NotNull&lt;/span&gt; &lt;span class="nd"&gt;@Positive&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Again, we use Lombok to create a builder for this class along with some validation constraints for it's fields.&lt;/p&gt;

&lt;p&gt;It's mapper is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.mappers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.Instant&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.LocalDateTime&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetRequestMapper&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="nf"&gt;fromPostRequest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;PostTweetRequestDto&lt;/span&gt; &lt;span class="n"&gt;postTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;senderId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSenderId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;text&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getText&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;timestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="nc"&gt;LocalDateTime&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;from&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="nc"&gt;Instant&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;ofEpochMilli&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;postTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestamp&lt;/span&gt;&lt;span class="o"&gt;()).&lt;/span&gt;&lt;span class="na"&gt;atZone&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Controller
&lt;/h3&gt;

&lt;p&gt;We use the &lt;code&gt;@Controller&lt;/code&gt; annotation to create our Tweets Controller with our POST &lt;code&gt;/tweets&lt;/code&gt; route. We also use the &lt;code&gt;@Validated&lt;/code&gt; annotation to ensure constraints over fields are applied.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.controllers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.services.TweetsService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.HttpResponse&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.Body&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.Controller&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.Post&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.validation.Validated&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Objects&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.validation.Valid&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Validated&lt;/span&gt;
&lt;span class="nd"&gt;@Controller&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/tweets"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsController&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsService&lt;/span&gt; &lt;span class="n"&gt;tweetsService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Post&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;postTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Valid&lt;/span&gt; &lt;span class="nd"&gt;@Body&lt;/span&gt; &lt;span class="nc"&gt;PostTweetRequestDto&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweet&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tweetsService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;publishTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Objects&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;nonNull&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;created&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;notFound&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This controller uses our Service to publish the tweet.&lt;/p&gt;

&lt;p&gt;We can build the project by running:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;./gradlew build &lt;span class="nt"&gt;-x&lt;/span&gt; &lt;span class="nb"&gt;test&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then, run it by doing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;java &lt;span class="nt"&gt;-jar&lt;/span&gt; build/libs/tweets-all.jar
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And, finally, call the API by doing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-L&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="s1"&gt;'http://localhost:8080/tweets'&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s1"&gt;'Content-Type: application/json'&lt;/span&gt; &lt;span class="nt"&gt;--data-raw&lt;/span&gt; &lt;span class="s1"&gt;'{
    "sender_id": 12356,
    "text": "xalala",
    "timestamp": 1619626150979
}'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Please note that, as the Tweets table references the Users table, the &lt;code&gt;senderId&lt;/code&gt; must be a valid user, so, first we must populate the Users table manually:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;psql &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-d&lt;/span&gt; tweets &lt;span class="nt"&gt;-U&lt;/span&gt; postgres                   
Password &lt;span class="k"&gt;for &lt;/span&gt;user postgres: 
psql &lt;span class="o"&gt;(&lt;/span&gt;12.6, server 11.2&lt;span class="o"&gt;)&lt;/span&gt;
Type &lt;span class="s2"&gt;"help"&lt;/span&gt; &lt;span class="k"&gt;for &lt;/span&gt;help.

&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# INSERT INTO users(username) VALUES ('my_username');&lt;/span&gt;
INSERT 0 1
&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# SELECT id, username FROM users WHERE username = 'my_username';&lt;/span&gt;
 &lt;span class="nb"&gt;id&lt;/span&gt; |  username   
&lt;span class="nt"&gt;----&lt;/span&gt;+-------------
  1 | my_username
&lt;span class="o"&gt;(&lt;/span&gt;1 row&lt;span class="o"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Publishing tweet events
&lt;/h2&gt;

&lt;p&gt;Now that we are able to handle requests to publish tweets, how do we trigger the tweet event to be published to Kafka?&lt;/p&gt;

&lt;h3&gt;
  
  
  Protocol schemas
&lt;/h3&gt;

&lt;p&gt;We'll define a generic "event" schema that will be used by Debezium to publish the events that will trigger our system's flows to Kafka. This schema will be defined using Apache Avro:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;protocols/avro/events/key.avro&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "type": "record",
    "name": "Key",
    "namespace": "com.georgeoliveira.events",
    "fields": [
      {
        "name": "aggregate_id",
        "type": [
          "null",
          "string"
        ],
        "default": null
      }
    ]
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;protocols/avro/events/value.avro&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "type": "record",
    "name": "Value",
    "namespace": "com.georgeoliveira.events",
    "fields": [
      {
        "name": "event_id",
        "doc": "A valid V4 UUID. Each event has a unique id.",
        "type": [
          "null",
          "string"
        ],
        "default": null
      },
      {
        "name": "aggregate_id",
        "doc": "The id of the whole aggregate that had any of the nested entities or the root entity edited.",
        "type": [
          "null",
          "string"
        ],
        "default": null
      },
      {
        "name": "type",
        "doc": "The type of the event, eg. \"create\" or \"update\".",
        "type": [
          "null",
          "string"
        ],
        "default": null
      },
      {
        "name": "aggregate",
        "doc": "The whole aggregate that had any of the nested entities or the root entity edited.",
        "type": [
          "null",
          "string"
        ],
        "default": null
      }
    ]
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://avro.apache.org/docs/current/spec.html" rel="noopener noreferrer"&gt;https://avro.apache.org/docs/current/spec.html&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The fields comments give you an explanation of the utility and importance of each field, but let's expand a discussion on the &lt;code&gt;aggregate&lt;/code&gt; and &lt;code&gt;type&lt;/code&gt; fields. &lt;/p&gt;

&lt;p&gt;The &lt;code&gt;aggregate&lt;/code&gt; will contain the payload with the relevant data needed to process the event. So, for example, if the we're developing a chat app and the event is "new message sent", the &lt;code&gt;aggregate&lt;/code&gt; could be a JSON like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"sender_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"recipient_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;....&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our aggregates for the tweets events will have the tweet payload:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"sender_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"timestamp"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="err"&gt;...&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We use the &lt;code&gt;type&lt;/code&gt; field to store the type of event that occurred so that different consumers can decide how to process it, e.g., the &lt;code&gt;post_tweet&lt;/code&gt; event may trigger the delivery processing, but the &lt;code&gt;edit_tweet&lt;/code&gt; may not.&lt;/p&gt;

&lt;p&gt;In order to create a schema for the aggregate payload, we define a &lt;a href="https://developers.google.com/protocol-buffers/docs/proto3" rel="noopener noreferrer"&gt;protocol buffer&lt;/a&gt; message type to represent a Tweet:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;protocols/proto/tweets/tweet.proto&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;syntax = "proto3";

package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TweetProtobuf";

message Tweet {
    int64 id = 1;
    int64 sender_id = 2;
    string text = 3;
    int64 timestamp = 4;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;So whenever a event is published, we ensure that the &lt;code&gt;aggregate&lt;/code&gt; field has the schema that the consumers expect. Also, these schemas can be versioned, so we can keep track of it's evolution.&lt;/p&gt;

&lt;p&gt;The protocols folder can be a &lt;a href="https://rogerdudler.github.io/git-guide/" rel="noopener noreferrer"&gt;git&lt;/a&gt; repository that is included in our system as &lt;a href="https://git-scm.com/book/en/v2/Git-Tools-Submodules" rel="noopener noreferrer"&gt;submodule&lt;/a&gt;, this way it's development can continue independently of the app and other related components can include them too.&lt;/p&gt;

&lt;h2&gt;
  
  
  Outbox pattern
&lt;/h2&gt;

&lt;p&gt;We  have defined the event schema and protobuffers, and all, but for now you must be wondering "how the hell do we actually publish the event????". Well, here is where the Outbox Pattern comes in. &lt;/p&gt;

&lt;p&gt;In this pattern, for each entity whose changes over it triggers any behaviour in our system, we have a database table called "The Outbox Table" of that entity. In our example case, for every new tweet published, we want to trigger the delivery flow, so that we'll need a "Tweet Outbox Table" for the entity "Tweet" from the table tweets.&lt;/p&gt;

&lt;p&gt;This table must be generic so that it has the same schema as the &lt;code&gt;Value&lt;/code&gt; avro schema that we defined earlier, as shown in our second migration &lt;code&gt;V2__AddTweetsOutboxTable.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;EXTENSION&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="nv"&gt;"uuid-ossp"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;tweets_outbox&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;event_id&lt;/span&gt; &lt;span class="n"&gt;UUID&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;uuid_generate_v4&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="n"&gt;aggregate_id&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="nb"&gt;TEXT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;aggregate&lt;/span&gt; &lt;span class="n"&gt;JSONB&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;created_at&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt; &lt;span class="k"&gt;WITHOUT&lt;/span&gt; &lt;span class="nb"&gt;TIME&lt;/span&gt; &lt;span class="k"&gt;ZONE&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="n"&gt;NOW&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Along with the outboxes, we'll need a Change Data Capture (CDC) app to monitor changes on the outbox tables and publish new entries as messages to our broker. This way, whenever we want to publish an event, we simply insert it at the outbox table.&lt;/p&gt;

&lt;p&gt;The CDC app we'll use is Debezium. We'll use the Debezium PostgreSQL Connector as a Kafka Connecttor, so that the messages are published to Kafka. To do, so we need to:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Configure our PostgreSQL instance to allow connections from Debezium.&lt;/li&gt;
&lt;li&gt;Setup the Kafka Connector with the Debezium PostgreSQL Connector plugin&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  PostgreSQL setup
&lt;/h3&gt;

&lt;p&gt;First thing we need to to is to enable Logical Decoding at our PG instance:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;We do this by installing this plugin called &lt;a href="https://github.com/eulerto/wal2json" rel="noopener noreferrer"&gt;wal2json&lt;/a&gt;, which is done by following it's README's instructions, that describes how to enable logical replication. &lt;/p&gt;

&lt;p&gt;It's important to know these steps in case you need to perform these steps in a managed or VM instance at yout job or whatever. &lt;/p&gt;

&lt;p&gt;For the purposes of this article, we'll use a Postgres Docker image provided by Debezium that already comes with these instalations and configurations: &lt;a href="https://github.com/debezium/docker-images/tree/master/postgres/12-alpine" rel="noopener noreferrer"&gt;https://github.com/debezium/docker-images/tree/master/postgres/12-alpine&lt;/a&gt; . &lt;/p&gt;

&lt;p&gt;This image repository is also a good summary of all configurations needed.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://www.postgresql.org/docs/12/logicaldecoding-explanation.html" rel="noopener noreferrer"&gt;https://www.postgresql.org/docs/12/logicaldecoding-explanation.html&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Kafka Connect setup
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://turkogluc.com/postgresql-capture-data-change-with-debezium/" rel="noopener noreferrer"&gt;This article&lt;/a&gt; explains the steps to setup and run the Kafka Connect with the Debezium plugin manually in a way that could not be summarized here without missing anything.&lt;/p&gt;

&lt;p&gt;For the purposes of this article, we'll use the Debezium Connect Docker image &lt;a href="https://github.com/debezium/docker-images/tree/master/connect/1.5" rel="noopener noreferrer"&gt;https://github.com/debezium/docker-images/tree/master/connect/1.5&lt;/a&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;  &lt;span class="na"&gt;connect&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;debezium/connect:1.5'&lt;/span&gt;
    &lt;span class="na"&gt;ports&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;8083:8083"&lt;/span&gt;
    &lt;span class="na"&gt;environment&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;BOOTSTRAP_SERVERS=kafka:9092&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;GROUP_ID=1&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;CONFIG_STORAGE_TOPIC=connect_configs&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;OFFSET_STORAGE_TOPIC=connect_offsets&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;STATUS_STORAGE_TOPIC=connect_statuses&lt;/span&gt;
    &lt;span class="na"&gt;depends_on&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;kafka&lt;/span&gt;
      &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;postgres&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For more detailes, please refer to &lt;a href="https://turkogluc.com/postgresql-capture-data-change-with-debezium/" rel="noopener noreferrer"&gt;this&lt;/a&gt; article.&lt;/p&gt;

&lt;h3&gt;
  
  
  Create the Debezium connector
&lt;/h3&gt;

&lt;p&gt;Next, we need to setup the connector that will listen to changes at the Tweets Table. For that, we'll use the Debezium's REST API:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-d&lt;/span&gt; @&lt;span class="s2"&gt;"connector-config.json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
&lt;span class="nt"&gt;-X&lt;/span&gt; POST http://connect:8083/connectors
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;where &lt;code&gt;connector-config.json&lt;/code&gt; is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"tweets_outbox_connector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"config"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"connector.class"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.connector.postgresql.PostgresConnector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"slot.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"tweets_outbox_connector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"unwrap,ValueToKey,SetKeySchema,SetValueSchema"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.unwrap.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.debezium.transforms.ExtractNewRecordState"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.ValueToKey.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.transforms.ValueToKey"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.ValueToKey.fields"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"aggregate_id"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.SetValueSchema.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.SetValueSchema.schema.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"com.georgeoliveira.events.Value"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.SetKeySchema.type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"org.apache.kafka.connect.transforms.SetSchemaMetadata$Key"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"transforms.SetKeySchema.schema.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"com.georgeoliveira.events.Key"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"key.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"http://schema-registry:8081/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"io.confluent.connect.avro.AvroConverter"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"value.converter.schema.registry.url"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"http://schema-registry:8081/"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"plugin.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"wal2json_rds"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.server.name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgres"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.dbname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"tweets"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.hostname"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgres"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.port"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"5432"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.user"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgres"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"database.password"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"pass"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"schema.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"table.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public.tweets_outbox"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And we can see the connector status by running&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl http://connect:8083/connectors/tweets_outbox_connector/status
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It's important to note that if everything goes wrong with the postgres or Kafka that Debezium is connected to, the connector will stop working and will not return to the "running" state on it's own. We'll need to manually recreate it.&lt;/p&gt;

&lt;p&gt;To delete a connector, besides calling the DELETE method at Debezium's API, by doing:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; DELETE http://debezium.debezium/connectors/tweets_outbox_connector &lt;span class="nt"&gt;-v&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;we need to delete the postgres replication slots:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="n"&gt;pg_drop_replication_slot&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'tweets_outbox_connector'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To test that outbox events are being published, let's insert some data into the outbox table and try to consume from kafka:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;psql &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-d&lt;/span&gt; tweets &lt;span class="nt"&gt;-U&lt;/span&gt; postgres
Password &lt;span class="k"&gt;for &lt;/span&gt;user postgres: 
psql &lt;span class="o"&gt;(&lt;/span&gt;12.6&lt;span class="o"&gt;)&lt;/span&gt;
Type &lt;span class="s2"&gt;"help"&lt;/span&gt; &lt;span class="k"&gt;for &lt;/span&gt;help.

&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# insert into tweets_outbox(aggregate_id, type, aggregate) values ('1', 'test', '{"test":"this is a test"}');&lt;/span&gt;
INSERT 0 1
&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;#&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Using &lt;code&gt;kafkacat&lt;/code&gt; to consume from the Kafka topic &lt;code&gt;postgres.public.tweets_outbox&lt;/code&gt;, we get something like:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;kafkacat &lt;span class="nt"&gt;-C&lt;/span&gt; &lt;span class="nt"&gt;-b&lt;/span&gt; kafka:9092 &lt;span class="nt"&gt;-t&lt;/span&gt; postgres.public.tweets_outbox
H71dbed94-c3db-410e-b7a0-7b0a120e8617test4&lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="s2"&gt;"test"&lt;/span&gt;: &lt;span class="s2"&gt;"this is a test"&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;����߹�
% Reached end of topic postgres.public.tweets_outbox &lt;span class="o"&gt;[&lt;/span&gt;0] at offset 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Publishing the tweet event
&lt;/h3&gt;

&lt;p&gt;Whenever a tweet is posted, we publish this event by inserting the related data at the outbox table. &lt;/p&gt;

&lt;p&gt;We can also publish this event by using a &lt;a href="https://www.postgresql.org/docs/9.1/sql-createtrigger.html" rel="noopener noreferrer"&gt;database trigger&lt;/a&gt;. In our tweets example, which is very simple, it works nicelly, but for more complex schemas and it's relationships, things get more complicated and setting up these trigger rules can be a headache. It's simpler, then, to just insert at the outbox table whenever we want to, using it's own DAL. &lt;/p&gt;

&lt;p&gt;This DAL structure is very similar to the one we already defined for Tweets.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tweet Outbox Model&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.models&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.vladmihalcea.hibernate.type.json.JsonBinaryType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.sql.Timestamp&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.UUID&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Column&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Entity&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.GeneratedValue&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.GenerationType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Table&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.AccessLevel&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Getter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.RequiredArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Setter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.experimental.FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.hibernate.annotations.Type&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.hibernate.annotations.TypeDef&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.hibernate.annotations.TypeDefs&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Entity&lt;/span&gt;
&lt;span class="nd"&gt;@Table&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"public"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"tweets_outbox"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@Getter&lt;/span&gt;
&lt;span class="nd"&gt;@Setter&lt;/span&gt;
&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt;
&lt;span class="nd"&gt;@FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AccessLevel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;PRIVATE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;
&lt;span class="nd"&gt;@TypeDefs&lt;/span&gt;&lt;span class="o"&gt;({&lt;/span&gt;&lt;span class="nd"&gt;@TypeDef&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"jsonb"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;typeClass&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;JsonBinaryType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;)})&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetOutbox&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Id&lt;/span&gt;
  &lt;span class="nd"&gt;@Column&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"event_id"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@GeneratedValue&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strategy&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;GenerationType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;AUTO&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Exclude&lt;/span&gt;
  &lt;span class="no"&gt;UUID&lt;/span&gt; &lt;span class="n"&gt;eventId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Column&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"aggregate_id"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;aggregateId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Column&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"type"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;type&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Type&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;type&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"jsonb"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@Column&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"aggregate"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;columnDefinition&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"jsonb"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Column&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"created_at"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;insertable&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Exclude&lt;/span&gt;
  &lt;span class="nc"&gt;Timestamp&lt;/span&gt; &lt;span class="n"&gt;createdAt&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the mapper:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.mappers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.campaigns.proto.TweetProtobuf&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.googlecode.protobuf.format.JsonFormat&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.time.ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetOutboxMapper&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TweetOutbox&lt;/span&gt; &lt;span class="nf"&gt;toOutboxModel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;EventType&lt;/span&gt; &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;tweetProto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;toProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;aggregate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;toAggregate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetProto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;TweetOutbox&lt;/span&gt; &lt;span class="n"&gt;tweetOutbox&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TweetOutbox&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;tweetOutbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setType&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toString&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;tweetOutbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setAggregate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;aggregate&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;tweetOutbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setAggregateId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;()));&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;tweetOutbox&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TweetProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Tweet&lt;/span&gt; &lt;span class="nf"&gt;toProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TweetProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Tweet&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setSenderId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSenderId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setText&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getText&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setTimestamp&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimestamp&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;toInstant&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ZoneOffset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;UTC&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toEpochMilli&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="nf"&gt;toAggregate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Tweet&lt;/span&gt; &lt;span class="n"&gt;tweetProto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;JsonFormat&lt;/span&gt; &lt;span class="n"&gt;jsonFormat&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;JsonFormat&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;jsonFormat&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;printToString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetProto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;There are some new steps, because we're using the protobuffers we defined earlier. First, we map our tweet dto to a protobuffer, then we map this protobuffer to a json string and then finally set it as the &lt;code&gt;aggregate&lt;/code&gt; field of the outbox.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Tweet Outbox DAO&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dal.dao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.annotation.Repository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.jpa.repository.JpaRepository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.UUID&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Repository&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;TweetsOutboxDao&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;JpaRepository&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetOutbox&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;UUID&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Tweet Outbox DAL&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dal.dao.TweetsOutboxDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.mappers.TweetOutboxMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.models.TweetOutbox&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsOutboxDal&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsOutboxDao&lt;/span&gt; &lt;span class="n"&gt;tweetsOutboxDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;sendToOutbox&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;EventType&lt;/span&gt; &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetOutbox&lt;/span&gt; &lt;span class="n"&gt;tweetOutbox&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TweetOutboxMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toOutboxModel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;tweetsOutboxDao&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;save&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetOutbox&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Service&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;We need to modify our Tweets Service to send the tweet events to outbox whenever a tweet is published:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.services&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.dtos.PostTweetRequestDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.mappers.TweetRequestMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal.TweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dal.TweetsOutboxDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.outbox.dtos.EventType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsDal&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Inject&lt;/span&gt;
  &lt;span class="nc"&gt;TweetsOutboxDal&lt;/span&gt; &lt;span class="n"&gt;tweetsOutboxDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="nf"&gt;publishTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;PostTweetRequestDto&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TweetRequestMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromPostRequest&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;persistedTweetDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;persistTweet&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="n"&gt;tweetsOutboxDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;sendToOutbox&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;persistedTweetDto&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;EventType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;PUBLISH_TWEET&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;persistedTweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Processing tweet events
&lt;/h2&gt;

&lt;p&gt;Now that we are able to publish tweets and have it to trigger a tweet event to be published to kafka, we'll implement the event-driven part of our architecture, the one that receives these events and then performs an action. &lt;/p&gt;

&lt;p&gt;In our case, we'll:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;consume tweet events from kafka&lt;/li&gt;
&lt;li&gt;add these events to the author's followers timelines&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;To do that, we'll need:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;a kafka consumer for tweet events&lt;/li&gt;
&lt;li&gt;an abstraction for timelines&lt;/li&gt;
&lt;li&gt;a way to persist the timelines&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We could use our postgres instance to persist the timelines, but we're interested in some sort of cache from where we can retrieve these data rapidly, which seems to be a good fit for &lt;a href="https://redis.io/" rel="noopener noreferrer"&gt;Redis&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;We'll implement this in two parts:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;The consumer, with two components:

&lt;ul&gt;
&lt;li&gt;a listener: that will be connected to Kafka and will receive the events from a topic&lt;/li&gt;
&lt;li&gt;a processor: that will process the event received at the listener&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;A DAL for the timelines, that will be an abstraction for our Redis instance. This DAL will have the same components from other DALs we already created for this project.&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Timelines DAL
&lt;/h3&gt;

&lt;p&gt;This component is an abstraction for Redis, and we'll use it to persist the user's timelines. &lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Protocol schemas&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;In order to do this, we need to define a schema for how we'll store the data into Redis.&lt;/p&gt;

&lt;p&gt;For the key, we'll simply use the &lt;code&gt;user_id&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;For the value part, we'll use a byte array representation of a protobuffer:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;syntax = "proto3";

package com.georgeoliveira.tweets.proto;
option java_outer_classname = "TimelineProtobuf";

import "tweets/tweet.proto";

message Timeline {
    int64 user_id = 1;
    repeated com.georgeoliveira.tweets.proto.Tweet tweets = 2;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  DTO
&lt;/h4&gt;

&lt;p&gt;We'll use Lombok to create our Timeline DTO with a builder:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dtos&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Builder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;toBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@Value&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;tweetsList&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Mappers
&lt;/h4&gt;

&lt;p&gt;Here is our mapper that creates &lt;code&gt;TimelineDto&lt;/code&gt; from a list of &lt;code&gt;TweetDto&lt;/code&gt; and maps &lt;code&gt;TimelineDto&lt;/code&gt; to byte arrays.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.mappers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.proto.TimelineProtobuf&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.stream.Collectors&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.commons.lang3.tuple.Pair&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelineMapper&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="nf"&gt;fromList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;tweetsList&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TimelineDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;tweetsList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetsList&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Pair&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;toUserIdTimelineByteArrayPair&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Pair&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserId&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;toTimelineByteArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="nf"&gt;toTimelineByteArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;toProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;toByteArray&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;TimelineProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Timeline&lt;/span&gt; &lt;span class="nf"&gt;toProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TimelineProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Timeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newBuilder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setUserId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addAllTweets&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;timelineDto&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTweetsList&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;TweetMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
                &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  DAO
&lt;/h4&gt;

&lt;p&gt;To access Redis, we'll use the &lt;a href="https://lettuce.io" rel="noopener noreferrer"&gt;Lettuce Client.&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;First, we'll define a &lt;code&gt;[TimelineCommands&lt;/code&gt; interface](&lt;a href="https://lettuce.io/core/release/reference/index.html#redis-command-interfaces" rel="noopener noreferrer"&gt;https://lettuce.io/core/release/reference/index.html#redis-command-interfaces&lt;/a&gt;), that defines the methods that we'll use to interact with Redis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.dao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.lettuce.core.dynamic.Commands&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.lettuce.core.dynamic.annotation.Command&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;TimelineCommands&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;Commands&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Command&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"SET"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;timelineByteArray&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="nd"&gt;@Command&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"GET"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we'll define a Factory that will set up a Singleton with a instance of that interface that is connected to our Redis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.dao.factories&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.lettuce.core.RedisClient&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.lettuce.core.dynamic.RedisCommandFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.context.annotation.Factory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.context.annotation.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Factory&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelineCommandsFactory&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${redis.host}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="no"&gt;REDIS_HOST&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Singleton&lt;/span&gt;
  &lt;span class="nc"&gt;TimelineCommands&lt;/span&gt; &lt;span class="nf"&gt;timelineCommands&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;RedisClient&lt;/span&gt; &lt;span class="n"&gt;redisClient&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;RedisClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;create&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;REDIS_HOST&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;RedisCommandFactory&lt;/span&gt; &lt;span class="n"&gt;commandFactory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;RedisCommandFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;redisClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;connect&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;commandFactory&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getCommands&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimelineCommands&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  DAL
&lt;/h4&gt;

&lt;p&gt;Finally, our DAL that wraps the other components is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.dao.TimelineCommands&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.commons.lang3.tuple.Pair&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelinesDal&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TimelineCommands&lt;/span&gt; &lt;span class="n"&gt;timelineCommandsDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;persistTimeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Pair&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;userIdTimelinePair&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
        &lt;span class="nc"&gt;TimelineMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toUserIdTimelineByteArrayPair&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;timelineCommandsDao&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;set&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
        &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userIdTimelinePair&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getLeft&lt;/span&gt;&lt;span class="o"&gt;()),&lt;/span&gt; &lt;span class="n"&gt;userIdTimelinePair&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getRight&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Consuming tweet events
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Processor
&lt;/h3&gt;

&lt;p&gt;The processor is the component that will implement the flow for the fan-out approach described at the beginning of this article:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Post a tweet: find all people that follows the tweet author and insert the new tweet at their home timeline caches.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Let's break down this flow into:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;find all people that follows the tweet author&lt;/li&gt;
&lt;li&gt;insert the new tweet at their timelines&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For the first part, we'll need an abstraction for the User table, so that it can retrieve user followers:&lt;/p&gt;

&lt;p&gt;Here's our &lt;code&gt;User&lt;/code&gt; model:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.models&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.ArrayList&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Entity&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.FetchType&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.JoinColumn&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.JoinTable&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.OneToMany&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.persistence.Table&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.AccessLevel&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Getter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.RequiredArgsConstructor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Setter&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.experimental.FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Entity&lt;/span&gt;
&lt;span class="nd"&gt;@Table&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;schema&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"public"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"users"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@Getter&lt;/span&gt;
&lt;span class="nd"&gt;@Setter&lt;/span&gt;
&lt;span class="nd"&gt;@RequiredArgsConstructor&lt;/span&gt;
&lt;span class="nd"&gt;@FieldDefaults&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AccessLevel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;PRIVATE&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;User&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Id&lt;/span&gt; &lt;span class="nd"&gt;@EqualsAndHashCode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Exclude&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@OneToMany&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;fetch&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;FetchType&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;EAGER&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nd"&gt;@JoinTable&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"follows"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;joinColumns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nd"&gt;@JoinColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"followee_id"&lt;/span&gt;&lt;span class="o"&gt;)},&lt;/span&gt;
      &lt;span class="n"&gt;inverseJoinColumns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;span class="nd"&gt;@JoinColumn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"follower_id"&lt;/span&gt;&lt;span class="o"&gt;)})&lt;/span&gt;
  &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;User&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;followers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ArrayList&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note the &lt;code&gt;followers&lt;/code&gt; field that maps the &lt;code&gt;follows&lt;/code&gt; association table and will allow us to retrieve a user's followers. Also note that we only mapped what is useful to this very specific flow that we are building, and so many features required to model a "real user" were left behind.&lt;/p&gt;

&lt;p&gt;Here's is our DTO for users:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dtos&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Builder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;lombok.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Value&lt;/span&gt;
&lt;span class="nd"&gt;@Builder&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;toBuilder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;UserDto&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
  &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;UserDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;followers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the mapper:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.mappers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dtos.UserDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.models.User&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.stream.Collectors&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;UserMapper&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;UserDto&lt;/span&gt; &lt;span class="nf"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;User&lt;/span&gt; &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;UserDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;id&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;username&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUsername&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;followers&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getFollowers&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;UserMapper:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;()))&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And the DAO:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dal.dao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.models.User&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.annotation.Repository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.data.jpa.repository.JpaRepository&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Repository&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;UsersDao&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nc"&gt;JpaRepository&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;User&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And, finally, the DAL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dal.dao.UsersDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dtos.UserDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.mappers.UserMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Collections&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;UsersDal&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;UsersDao&lt;/span&gt; &lt;span class="n"&gt;usersDao&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;UserDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getUserFollowers&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;usersDao&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;findById&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;UserMapper:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;UserDto:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;getFollowers&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;orElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emptyList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For the second part of the flow, we already defined an DAL that persists timelines to the cache, but we still need to implement a way to retrieve the tweets from the timeline of a particular user.&lt;/p&gt;

&lt;p&gt;To do this, we'll modify our &lt;code&gt;TweetsDal&lt;/code&gt; component by adding the method&lt;code&gt;getTimelineForUser&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getTimelineForUser&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;timelineSize&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;tweetsDao&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;findTimelineTweetsByUserId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timelineSize&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;TweetMapper:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;fromModel&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and the DAO with a &lt;a href="https://micronaut-projects.github.io/micronaut-data/latest/guide/#nativeQueries" rel="noopener noreferrer"&gt;native query&lt;/a&gt; that retrieves the tweets from the timeline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;  &lt;span class="nd"&gt;@Query&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
      &lt;span class="n"&gt;value&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
          &lt;span class="s"&gt;"SELECT * FROM tweets t JOIN follows f ON f.followee_id = t.sender_id WHERE f.follower_id = :userId ORDER BY timestamp DESC LIMIT :limit"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="n"&gt;nativeQuery&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="kc"&gt;true&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Tweet&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;findTimelineTweetsByUserId&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;limit&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With these methods, we're allowed to retrieve the tweets list for a user's timeline and build the &lt;code&gt;TimelineDto&lt;/code&gt; using the mapper we defined earlier.&lt;/p&gt;

&lt;p&gt;Finally, the processor is:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.worker.processors&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.mappers.TimelineMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dal.TweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dal.UsersDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.users.dtos.UserDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.function.Consumer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.stream.Collectors&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsProcessor&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TimelinesDal&lt;/span&gt; &lt;span class="n"&gt;timelinesDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsDal&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;UsersDal&lt;/span&gt; &lt;span class="n"&gt;usersDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Override&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;accept&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;UserDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;authorFollowers&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;usersDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserFollowers&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSenderId&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;timelineDtos&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
        &lt;span class="n"&gt;authorFollowers&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;UserDto:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;getId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;userId&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
                    &lt;span class="nc"&gt;TimelineMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tweetsDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTimelineForUser&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;100L&lt;/span&gt;&lt;span class="o"&gt;)))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;collect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collectors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;toList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

    &lt;span class="n"&gt;timelineDtos&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;forEach&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;timelinesDal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;persistTimeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Listener
&lt;/h3&gt;

&lt;p&gt;This component listens to tweet events and, for each one, triggers the processor flow:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.worker.listeners&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.events.Key&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.events.Value&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.mappers.TweetMapper&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.worker.processors.TweetsProcessor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.configuration.kafka.annotation.KafkaListener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.configuration.kafka.annotation.OffsetReset&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.configuration.kafka.annotation.Topic&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.io.IOException&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;offsetReset&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;OffsetReset&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;EARLIEST&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TweetsListener&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TweetsProcessor&lt;/span&gt; &lt;span class="n"&gt;tweetsProcessor&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Topic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${topics.tweets}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;listen&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Key&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Value&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;IOException&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;TweetDto&lt;/span&gt; &lt;span class="n"&gt;tweetDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;TweetMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromRecord&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;tweetsProcessor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;accept&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note the &lt;code&gt;offsetReset&lt;/code&gt; strategy defined as &lt;code&gt;OffsetReset.EARLIEST&lt;/code&gt; which makes the listener that a consumer will start reading the earliest available records for the topic. More details on offset management are available &lt;a href="https://docs.confluent.io/platform/current/clients/consumer.html#offset-management" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Retrieving timelines
&lt;/h2&gt;

&lt;p&gt;Let's define a route that will allow us to retrieve timeline tweets, as we said earlier at the second point of the fan-out approach:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Home timeline: just read from the cache.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;🤷&lt;/p&gt;

&lt;p&gt;To "just read from the cache", a few components are required. The structure is the same we used when building the tweets API.&lt;/p&gt;

&lt;h3&gt;
  
  
  Service
&lt;/h3&gt;

&lt;p&gt;We first need to add a &lt;code&gt;fromByteArray&lt;/code&gt; method to the &lt;code&gt;TimelineMapper&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;static&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;fromByteArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;timelineByteArray&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="nc"&gt;TimelineProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Timeline&lt;/span&gt; &lt;span class="n"&gt;timelineProto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
          &lt;span class="nc"&gt;TimelineProtobuf&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Timeline&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;parseFrom&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineByteArray&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
      &lt;span class="nc"&gt;TimelineDto&lt;/span&gt; &lt;span class="n"&gt;timelineDto&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;fromProto&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineProto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineDto&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;InvalidProtocolBufferException&lt;/span&gt; &lt;span class="o"&gt;|&lt;/span&gt; &lt;span class="nc"&gt;NullPointerException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;empty&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This method will be used by the new method that we'll add to the &lt;code&gt;TimelinesDal&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TimelineDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getUserTimeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="o"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;timelineByteArray&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;timelineCommandsDao&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;valueOf&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;TimelineMapper&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromByteArray&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timelineByteArray&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, this new method will be used by the service:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.services&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dal.TimelinesDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.timelines.dtos.TimelineDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.Collections&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Singleton&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Singleton&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelinesService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TimelinesDal&lt;/span&gt; &lt;span class="n"&gt;timelinesDal&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getUserTimelineTweets&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;timelinesDal&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserTimeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nl"&gt;TimelineDto:&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;getTweetsList&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
        &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;orElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;emptyList&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Controller
&lt;/h3&gt;

&lt;p&gt;The controller uses the service to retrieve a list of tweets of a timeline from the requested user:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;&lt;span class="kn"&gt;package&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.controllers&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.api.services.TimelinesService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;com.georgeoliveira.tweets.common.tweets.dtos.TweetDto&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.HttpResponse&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.HttpStatus&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.Controller&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.Get&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;io.micronaut.http.annotation.PathVariable&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;java.util.List&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;javax.inject.Inject&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Controller&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/timelines"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TimelinesController&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@Inject&lt;/span&gt; &lt;span class="nc"&gt;TimelinesService&lt;/span&gt; &lt;span class="n"&gt;timelinesService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Get&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/{userId}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;getUserTimeline&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@PathVariable&lt;/span&gt; &lt;span class="nc"&gt;Long&lt;/span&gt; &lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;List&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;TweetDto&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;tweetDtoList&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;timelinesService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getUserTimelineTweets&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;userId&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDtoList&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;isEmpty&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;noContent&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;HttpResponse&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;status&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;HttpStatus&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;FOUND&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;body&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tweetDtoList&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let's see it working by building the project:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;./gradlew build &lt;span class="nt"&gt;-x&lt;/span&gt; &lt;span class="nb"&gt;test&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then running it:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;java &lt;span class="nt"&gt;-jar&lt;/span&gt; build/libs/tweets-all.jar 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then setting up another user to follow the one we created earlier:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# insert into users(username) values('cool_user');&lt;/span&gt;
INSERT 0 1
&lt;span class="nv"&gt;tweets&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="c"&gt;# insert into follows(follower_id, followee_id) values(2, 1);&lt;/span&gt;
INSERT 0 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then publishing a tweet from user &lt;code&gt;1&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl &lt;span class="nt"&gt;-L&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="s1"&gt;'http://localhost:8080/tweets'&lt;/span&gt; &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s1"&gt;'Content-Type: application/json'&lt;/span&gt; &lt;span class="nt"&gt;--data-raw&lt;/span&gt; &lt;span class="s1"&gt;'{
    "sender_id": 1,
    "text": "my new tweet",
    "timestamp": 1619626150979
}
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And finally retrieving the timeline for user &lt;code&gt;2&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nv"&gt;$ &lt;/span&gt;curl http://localhost:8080/timelines/2                                        
&lt;span class="o"&gt;[{&lt;/span&gt;&lt;span class="s2"&gt;"id"&lt;/span&gt;:2,&lt;span class="s2"&gt;"text"&lt;/span&gt;:&lt;span class="s2"&gt;"my new tweet"&lt;/span&gt;,&lt;span class="s2"&gt;"timestamp"&lt;/span&gt;:[2021,4,28,16,9,10,979000000],&lt;span class="s2"&gt;"senderId"&lt;/span&gt;:1&lt;span class="o"&gt;}]&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusions
&lt;/h2&gt;

&lt;p&gt;We just created a event-driven app using Micronaut, Kafka and Debezium using many patterns applied in "real-world" applications.&lt;/p&gt;

&lt;p&gt;We also saw many of the complexities added to our project because we chose to use Kafka as a broker. For smaller applications, that may be ok, but for "greater" and more complex ones, might be better to consider &lt;a href="https://diogojoma.medium.com/event-sourcing-why-using-a-message-broker-is-a-bad-idea-ddc11089c876" rel="noopener noreferrer"&gt;creating an event store of your own&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>java</category>
      <category>debezium</category>
      <category>kafka</category>
      <category>postgres</category>
    </item>
  </channel>
</rss>
