<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Oliver Broughton</title>
    <description>The latest articles on DEV Community by Oliver Broughton (@olibroughton).</description>
    <link>https://dev.to/olibroughton</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F215174%2F4d5e7b69-5ecc-4013-b1f4-eda352ed0360.png</url>
      <title>DEV Community: Oliver Broughton</title>
      <link>https://dev.to/olibroughton</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/olibroughton"/>
    <language>en</language>
    <item>
      <title>Building a Scalable Live Stream Chat Service with Spring WebFlux, Redis PubSub, RSocket and Auth0</title>
      <dc:creator>Oliver Broughton</dc:creator>
      <pubDate>Fri, 20 Aug 2021 12:46:36 +0000</pubDate>
      <link>https://dev.to/olibroughton/building-a-scalable-live-stream-chat-service-with-spring-webflux-redis-pubsub-rsocket-and-auth0-22o9</link>
      <guid>https://dev.to/olibroughton/building-a-scalable-live-stream-chat-service-with-spring-webflux-redis-pubsub-rsocket-and-auth0-22o9</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;This post will explore how to build a single real-time chat room similar to Twitch chat. The focus will be on developing a service that can scale horizontally to maintain availability and an increase in demand. This service will be developed with Java Spring Boot.&lt;/p&gt;

&lt;p&gt;Feel free to check the repository on &lt;a href="https://github.com/oli-broughton/livestream-chat" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Requirements
&lt;/h2&gt;

&lt;p&gt;As the main goal of this post will be to explore real-time chat scalability, chatroom functionality will be kept to a minimum.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Users will be able to join a single global chat room. There will not be rooms to select.&lt;/li&gt;
&lt;li&gt;To simplify the service messages will only be text.&lt;/li&gt;
&lt;li&gt;Similar to twitch, users will only be able to see chat messages from when they joined the room. There will be no chat history.&lt;/li&gt;
&lt;li&gt;Users will be able to view messages anonymous but will only be able to send messages to the chat once they have signed up with a unique username.&lt;/li&gt;
&lt;li&gt;Message senders will be identified by username only.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;

&lt;p&gt;To facilitate real-time communication we will be using the WebSocket Protocol to transport messages between clients and the service.  To horizontally scale, multiple server instances will be needed so new WebSocket connections can be load balanced between them.&lt;/p&gt;

&lt;p&gt;With WebSocket connections load-balanced between the instances, we can utilise a message broker to fan-out messages to all of the clients. For this project, we will be brokering messages with &lt;a href="https://redis.io/topics/pubsub" rel="noopener noreferrer"&gt;Redis PubSub&lt;/a&gt;. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl42j9hn1i6rop0h8kr8d.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl42j9hn1i6rop0h8kr8d.png" alt="Architecture Diagram"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Redis PubSub
&lt;/h2&gt;

&lt;p&gt;Redis PubSub will allow for a high throughput of messages to be fanned out to its subscribers with minimal latency.  It can also be distributed to multiple instances to increase throughput and redundancy. &lt;a href="https://www.youtube.com/watch?v=6G22a5Iooqk" rel="noopener noreferrer"&gt;Here&lt;/a&gt; is a great talk by Shahar Mor that goes into more detail.&lt;/p&gt;

&lt;h3&gt;
  
  
  Limitations
&lt;/h3&gt;

&lt;p&gt;One trade-off with Redis PubSub is that it only "fires and forgets" to subscribers and offers no way to guarantee message delivery. This could result in messages being dropped if a server instance is restarted and users have to reconnect. &lt;/p&gt;

&lt;p&gt;Adding guaranteed message delivery is outside the scope of this post. The argument could also be made that infrequent message drops for some users would not affect the experience of a live stream chat.&lt;/p&gt;

&lt;p&gt;If you require a higher degree of message reliability it is worth checking out other message brokers such as &lt;a href="https://kafka.apache.org/" rel="noopener noreferrer"&gt;Kafka&lt;/a&gt;, &lt;a href="https://www.rabbitmq.com/" rel="noopener noreferrer"&gt;RabbitMQ&lt;/a&gt; or even &lt;a href="https://redis.io/topics/streams-intro" rel="noopener noreferrer"&gt;Redis Streams&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Testing
&lt;/h2&gt;

&lt;p&gt;In a later post, we will be developing a React client that will connect to our chat service but until then we will be checking that the requirements are met by developing integration tests within the Spring framework.&lt;/p&gt;

&lt;p&gt;We will use the &lt;a href="https://github.com/Playtika/testcontainers-spring-boot" rel="noopener noreferrer"&gt;Playtika testcontainers-springboot&lt;/a&gt; library to easily spin up a Redis docker container while running the integration tests locally. This can then be used as our PubSub message broker as we test the functionality of the connection server.&lt;/p&gt;

&lt;p&gt;All integration tests used to meet the requirements can be found &lt;a href="https://github.com/oli-broughton/livestream-chat/blob/e8ff45dc6eda5054bd70f3e5e891a01fbe4ad4f6/src/test/java/com/livestreamchat/pubsub/PubSubControllerITests.java" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Warning!&lt;/strong&gt; There is a lot of areas to cover so this will be a bit of an info dump. I am always happy to answer questions to clear up any details.  &lt;/p&gt;

&lt;h3&gt;
  
  
  Spring WebFlux
&lt;/h3&gt;

&lt;p&gt;We will be utilising the non-blocking Spring WebFlux framework for developing the connection server. This will help to reduce the latency of having a high number of connections. WebFlux will also allow us to utilise Project Reactor's Reactive Streams implementation to write asynchronous code that is easier to reason about.&lt;/p&gt;

&lt;p&gt;For information on the advantages of reactive programming feel free to visit the &lt;a href="https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html#webflux-new-framework" rel="noopener noreferrer"&gt;Spring&lt;/a&gt; or &lt;a href="https://projectreactor.io/docs/core/release/reference/#_blocking_can_be_wasteful" rel="noopener noreferrer"&gt;Project Reactor&lt;/a&gt; sites.&lt;/p&gt;

&lt;p&gt;To use Spring WebFlux this dependency is required. &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot:spring-boot-starter-webflux'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Maybe link guide to setup and more info on reactive programming&lt;/p&gt;

&lt;h3&gt;
  
  
  PubSub Service
&lt;/h3&gt;

&lt;p&gt;We will be abstracting the PubSub functionality into a service with a common message class and interface.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Data&lt;/span&gt;  
&lt;span class="nd"&gt;@AllArgsConstructor&lt;/span&gt;
&lt;span class="nd"&gt;@NoArgsConstructor&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;PubSubService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; 
    &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt; 
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Redis Configuration
&lt;/h3&gt;

&lt;p&gt;To interface with the Redis server, we will need the Spring Boot Redis dependency. &lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot:spring-boot-starter-data-redis'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Redis will also have to be configured so that we can utilise its reactive API. We will be adding the necessary configuration beans into the RedisPubSubConfig class.&lt;/p&gt;

&lt;p&gt;The ReactiveRedisTemplate bean is required to publish messages and is configured so it can serialise Message objects into JSON.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;ReactiveRedisTemplate&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;reactiveRedisTemplate&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ReactiveRedisConnectionFactory&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

        &lt;span class="nc"&gt;StringRedisSerializer&lt;/span&gt; &lt;span class="n"&gt;keySerializer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;StringRedisSerializer&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="nc"&gt;Jackson2JsonRedisSerializer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;valueSerializer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
                        &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Jackson2JsonRedisSerializer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;class&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;RedisSerializationContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;RedisSerializationContextBuilder&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
            &lt;span class="nc"&gt;RedisSerializationContext&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;newSerializationContext&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;keySerializer&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;RedisSerializationContext&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
            &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;valueSerializer&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ReactiveRedisTemplate&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A ReactiveRedisMessageListenerContainer bean is needed for subscribing to message channels.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="nc"&gt;ReactiveRedisMessageListenerContainer&lt;/span&gt; &lt;span class="nf"&gt;container&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ReactiveRedisConnectionFactory&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ReactiveRedisMessageListenerContainer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Redis PubSub Service
&lt;/h3&gt;

&lt;p&gt;We can now create the RedisPubSubService by implementing the PubSubService interface and injecting the configured Redis beans. &lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Service&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RedisPubSubService&lt;/span&gt; &lt;span class="kd"&gt;implements&lt;/span&gt; &lt;span class="nc"&gt;PubSubService&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;ReactiveRedisTemplate&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;reactiveTemplate&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;ReactiveRedisMessageListenerContainer&lt;/span&gt; &lt;span class="n"&gt;reactiveMsgListenerContainer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;ChannelTopic&lt;/span&gt; &lt;span class="n"&gt;channelTopic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ChannelTopic&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"broadcast"&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt; &lt;span class="c1"&gt;// channel used to send and recieve messages&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;RedisPubSubService&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ReactiveRedisTemplate&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;reactiveTemplate&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
                              &lt;span class="nc"&gt;ReactiveRedisMessageListenerContainer&lt;/span&gt; &lt;span class="n"&gt;reactiveMsgListenerContainer&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;reactiveMsgListenerContainer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;reactiveMsgListenerContainer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;reactiveTemplate&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;reactiveTemplate&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The publish interface method uses the Redis Template to serialise and publish the message to topic "broadcast". An empty Mono is returned to signal that message has been sent successfully.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Override&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;reactiveTemplate&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;convertAndSend&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channelTopic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getTopic&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;then&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;empty&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Subscribing to the channel is a bit more verbose and requires that we pass the channel as an Iterable. We also have to manually pass the SerializationContext from the Redis Template to deserialise incoming messages.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Override&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;reactiveMsgListenerContainer&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;receive&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Collections&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;singletonList&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;channelTopic&lt;/span&gt;&lt;span class="o"&gt;),&lt;/span&gt;
                    &lt;span class="n"&gt;reactiveTemplate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSerializationContext&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getKeySerializationPair&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt;
                    &lt;span class="n"&gt;reactiveTemplate&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getSerializationContext&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;getValueSerializationPair&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ReactiveSubscription&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;::&lt;/span&gt;&lt;span class="n"&gt;getMessage&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We now have a functional PubSub service that can broadcast messages between the connection server instances. &lt;/p&gt;

&lt;p&gt;Unit tests for this service can be found &lt;a href="https://github.com/oli-broughton/livestream-chat/blob/e8ff45dc6eda5054bd70f3e5e891a01fbe4ad4f6/src/test/java/com/livestreamchat/pubsub/RedisPubSubServiceTest.java" rel="noopener noreferrer"&gt;here&lt;/a&gt;. &lt;/p&gt;

&lt;h3&gt;
  
  
  RSocket
&lt;/h3&gt;

&lt;p&gt;For connecting clients to the chat service, we will be using the RSocket protocol on top of a WebSocket connection. RSocket allows us to utilise the Reactive Streams standard over a network boundary and has some useful features such as backpressure and routing. &lt;/p&gt;

&lt;p&gt;Backpressure will be particularly useful for our client application if we have a high surge of messages we can rate-limit the number of messages the client accepts.&lt;/p&gt;

&lt;p&gt;To utilise RSocket we need to include the Spring Boot RSocket dependency: &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot:spring-boot-starter-rsocket'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We will also need to configure RSocket to use Websockets and to set an endpoint for clients to connect to.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="n"&gt;spring&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rsocket&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;server&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;transport&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;websocket&lt;/span&gt;
&lt;span class="n"&gt;spring&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rsocket&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;server&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;mapping&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;path&lt;/span&gt;&lt;span class="o"&gt;=/&lt;/span&gt;&lt;span class="n"&gt;rs&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Next, we can implement the PubSubController that is responsible for handling RSocket messages from the client. This controller injects the PubSubService we created earlier. &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Controller&lt;/span&gt;
&lt;span class="nd"&gt;@Slf4j&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;PubSubController&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

        &lt;span class="kd"&gt;private&lt;/span&gt; &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="nc"&gt;PubSubService&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

    &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;PubSubController&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;PubSubService&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;this&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;messagingService&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;// .....&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;RSocket Spring allows you to utilise two helpful annotation methods : &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;@ConnectMapping - used to handle when a new connection is made.&lt;/li&gt;
&lt;li&gt;@MessageMapping - used to handle messages for a particular route.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We will utilise @ConnectMapping to log when new connections are made or closed.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@ConnectMapping&lt;/span&gt;
&lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;onConnect&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RSocketRequester&lt;/span&gt; &lt;span class="n"&gt;requester&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;Objects&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;requireNonNull&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;requester&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rsocket&lt;/span&gt;&lt;span class="o"&gt;(),&lt;/span&gt; &lt;span class="s"&gt;"rsocket connection should not be null"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;onClose&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;doOnError&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;error&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;warn&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;requester&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rsocketClient&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" Closed"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;doFinally&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;info&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;requester&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;rsocketClient&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="s"&gt;" Disconnected"&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;@MessageMapping("publish") will forward sent messages to the PubSubService.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@MessageMapping&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publish"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;@MessageMapping("subscribe") will stream the messages from the PubSubService to the client.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@MessageMapping&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"subscribe"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="nc"&gt;Flux&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;subscribe&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Auth0 Authentication
&lt;/h3&gt;

&lt;p&gt;We will be using the 3rd party service &lt;a href="https://auth0.com/" rel="noopener noreferrer"&gt;Auth0&lt;/a&gt; for user sign up and authentication. The &lt;a href="https://jwt.io/" rel="noopener noreferrer"&gt;JWT&lt;/a&gt; user access tokens provided by Auth0 are also compatible with RSocket. This will allow us to secure individual messaging routes. &lt;/p&gt;

&lt;p&gt;A great tutorial on using Auth0 with Spring Boot can be found &lt;a href="https://auth0.com/docs/quickstart/backend/java-spring-security5/01-authorization" rel="noopener noreferrer"&gt;here&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;The unique username entered at sign up will be encoded into the access token and decoded whenever a message is sent. This removes the need to store the username in the chat service. Due to this encoding, the access token should be short-lived to reduce the risk of being compromised. &lt;/p&gt;

&lt;p&gt;To be able to decode and verify JWT tokens in our app we will need this dependency.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.boot:spring-boot-starter-oauth2-resource-server'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We can then configure the authentication server endpoint and add properties for the correct auth0 audience and the custom claim used to encode the username.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="n"&gt;spring&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;security&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;oauth2&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;resourceserver&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;issuer&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nl"&gt;https:&lt;/span&gt;&lt;span class="c1"&gt;//{YOUR_AUTH0_DOMAIN}.us.auth0.com/&lt;/span&gt;
&lt;span class="n"&gt;auth0&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;audience&lt;/span&gt;&lt;span class="o"&gt;={&lt;/span&gt;&lt;span class="no"&gt;YOUR_AUTH0_API_AUDIENCE&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;auth0&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;username&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;claim&lt;/span&gt;&lt;span class="o"&gt;={&lt;/span&gt;&lt;span class="no"&gt;YOUR_AUTH0_USERNAME_CLAIM&lt;/span&gt;&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h3&gt;
  
  
  Securing RSocket
&lt;/h3&gt;

&lt;p&gt;To secure RSocket include the RSocket Security dependency.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="s1"&gt;'org.springframework.security:spring-security-rsocket'&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;To be able to pass the decoded JWT token as an &lt;a href="https://docs.spring.io/spring-security/site/docs/current/api/org/springframework/security/core/annotation/AuthenticationPrincipal.html" rel="noopener noreferrer"&gt;AuthenticationPrinciple&lt;/a&gt; we will need the spring security messaging dependency.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight groovy"&gt;&lt;code&gt;

&lt;span class="n"&gt;implementation&lt;/span&gt; &lt;span class="err"&gt;'&lt;/span&gt;&lt;span class="n"&gt;org&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;springframework&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;security&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;&lt;span class="n"&gt;spring&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;security&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;messaging&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We can now create the RSocketSecurityConfig class by injecting the auth0 properties added earlier. We can also add the @EnableRSocketSecurity and @EnableReactiveMethodSecurtiy annotations to active Security support.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Configuration&lt;/span&gt;
&lt;span class="nd"&gt;@EnableRSocketSecurity&lt;/span&gt;
&lt;span class="nd"&gt;@EnableReactiveMethodSecurity&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RSocketSecurityConfig&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${auth0.audience}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;audience&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${spring.security.oauth2.resourceserver.jwt.issuer-uri}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;issuer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${auth0.username-claim}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
    &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;usernameClaim&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;//......&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;To decode the JWT token we will need to create a &lt;a href="https://docs.spring.io/spring-security/site/docs/current/api/org/springframework/security/oauth2/jwt/ReactiveJwtDecoder.html" rel="noopener noreferrer"&gt;ReactiveJwtDecoder&lt;/a&gt; bean. Custom &lt;a href="https://docs.spring.io/spring-security/site/docs/current/api/org/springframework/security/oauth2/core/OAuth2TokenValidator.html" rel="noopener noreferrer"&gt;OAuth2TokenValidators&lt;/a&gt; are also added to check that the audience and username claims are included in the token.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;ReactiveJwtDecoder&lt;/span&gt; &lt;span class="nf"&gt;reactiveJwtDecoder&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;reactiveJwtDecoder&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;NimbusReactiveJwtDecoder&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;ReactiveJwtDecoders&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;fromOidcIssuerLocation&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;issuer&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="nc"&gt;OAuth2TokenValidator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Jwt&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;audienceValidator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;OAuth2Error&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;OAuth2Error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"invalid_token"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"The required audience is missing"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getAudience&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;contains&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;audience&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;OAuth2TokenValidatorResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;success&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;OAuth2TokenValidatorResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;failure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;};&lt;/span&gt;

    &lt;span class="nc"&gt;OAuth2TokenValidator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Jwt&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;usernameValidator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
        &lt;span class="nc"&gt;OAuth2Error&lt;/span&gt; &lt;span class="n"&gt;error&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;OAuth2Error&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"invalid_token"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"The required username is missing"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getClaimAsString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;usernameClaim&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;OAuth2TokenValidatorResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;success&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
        &lt;span class="o"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nc"&gt;OAuth2TokenValidatorResult&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;failure&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;error&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;};&lt;/span&gt;

    &lt;span class="nc"&gt;OAuth2TokenValidator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Jwt&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;withIssuer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;JwtValidators&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;createDefaultWithIssuer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;issuer&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="nc"&gt;OAuth2TokenValidator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Jwt&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;compositeValidator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DelegatingOAuth2TokenValidator&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;withIssuer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;audienceValidator&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;usernameValidator&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="n"&gt;reactiveJwtDecoder&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setJwtValidator&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;compositeValidator&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;reactiveJwtDecoder&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;To prevent unauthorised users from sending messages we can secure the "publish" route by configuring a PayloadSocketAcceptorInterceptor bean to require JWT tokens.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;PayloadSocketAcceptorInterceptor&lt;/span&gt; &lt;span class="nf"&gt;rsocketInterceptor&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RSocketSecurity&lt;/span&gt; &lt;span class="n"&gt;rSocketSecurity&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;rSocketSecurity&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;authorizePayload&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;authorize&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
                    &lt;span class="n"&gt;authorize&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;route&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publish"&lt;/span&gt;&lt;span class="o"&gt;).&lt;/span&gt;&lt;span class="na"&gt;authenticated&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; 
                            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;anyExchange&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;permitAll&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt; &lt;span class="c1"&gt;// everything else is permitted&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Customizer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;withDefaults&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;build&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;An RSocketMessageHandler bean is also configured to allow an AuthenticationPrincple to be resolved in RSocket handler methods.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="nc"&gt;RSocketMessageHandler&lt;/span&gt; &lt;span class="nf"&gt;messageHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;RSocketStrategies&lt;/span&gt; &lt;span class="n"&gt;strategies&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;RSocketMessageHandler&lt;/span&gt; &lt;span class="n"&gt;mh&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;RSocketMessageHandler&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
    &lt;span class="n"&gt;mh&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getArgumentResolverConfigurer&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;addCustomResolver&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
            &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;AuthenticationPrincipalArgumentResolver&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;mh&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setRouteMatcher&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;PathPatternRouteMatcher&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
    &lt;span class="n"&gt;mh&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setRSocketStrategies&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;strategies&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;mh&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;We can now access the decoded username claim when handling a message. &lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;p&gt;&lt;span class="nd"&gt;@Value&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"${auth0.username-claim}"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;br&gt;
&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;usernameClaim&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="c1"&gt;//......&lt;/span&gt;&lt;/p&gt;

&lt;p&gt;&lt;span class="nd"&gt;@MessageMapping&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publish"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;br&gt;
&lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Void&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nd"&gt;@AuthenticationPrincipal&lt;/span&gt; &lt;span class="nc"&gt;Mono&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;Jwt&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;&lt;br&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;jwt&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;jwt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getClaimAsString&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;usernameClaim&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;&lt;br&gt;
            &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;flatMap&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;username&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;messagingService&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;publish&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;Message&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;username&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="o"&gt;)));&lt;/span&gt;&lt;br&gt;
&lt;span class="o"&gt;}&lt;/span&gt;&lt;/p&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Conclusion&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;This wraps up the overview of the implementation. Again I am always happy to answer any questions about this post or the reasoning behind the technology chosen. &lt;/p&gt;

&lt;p&gt;Feel free to check the full source on &lt;a href="https://github.com/oli-broughton/livestream-chat" rel="noopener noreferrer"&gt;GitHub&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>java</category>
      <category>microservices</category>
    </item>
  </channel>
</rss>
