DEV Community

Oliver Broughton
Oliver Broughton

Posted on

Building a Scalable Live Stream Chat Service with Spring WebFlux, Redis PubSub, RSocket and Auth0

Introduction

This post will explore how to build a single real-time chat room similar to Twitch chat. The focus will be on developing a service that can scale horizontally to maintain availability and an increase in demand. This service will be developed with Java Spring Boot.

Feel free to check the repository on GitHub.

Requirements

As the main goal of this post will be to explore real-time chat scalability, chatroom functionality will be kept to a minimum.

  • Users will be able to join a single global chat room. There will not be rooms to select.
  • To simplify the service messages will only be text.
  • Similar to twitch, users will only be able to see chat messages from when they joined the room. There will be no chat history.
  • Users will be able to view messages anonymous but will only be able to send messages to the chat once they have signed up with a unique username.
  • Message senders will be identified by username only.

Architecture

To facilitate real-time communication we will be using the WebSocket Protocol to transport messages between clients and the service. To horizontally scale, multiple server instances will be needed so new WebSocket connections can be load balanced between them.

With WebSocket connections load-balanced between the instances, we can utilise a message broker to fan-out messages to all of the clients. For this project, we will be brokering messages with Redis PubSub.

Architecture Diagram

Redis PubSub

Redis PubSub will allow for a high throughput of messages to be fanned out to its subscribers with minimal latency. It can also be distributed to multiple instances to increase throughput and redundancy. Here is a great talk by Shahar Mor that goes into more detail.

Limitations

One trade-off with Redis PubSub is that it only "fires and forgets" to subscribers and offers no way to guarantee message delivery. This could result in messages being dropped if a server instance is restarted and users have to reconnect.

Adding guaranteed message delivery is outside the scope of this post. The argument could also be made that infrequent message drops for some users would not affect the experience of a live stream chat.

If you require a higher degree of message reliability it is worth checking out other message brokers such as Kafka, RabbitMQ or even Redis Streams.

Testing

In a later post, we will be developing a React client that will connect to our chat service but until then we will be checking that the requirements are met by developing integration tests within the Spring framework.

We will use the Playtika testcontainers-springboot library to easily spin up a Redis docker container while running the integration tests locally. This can then be used as our PubSub message broker as we test the functionality of the connection server.

All integration tests used to meet the requirements can be found here.

Implementation

Warning! There is a lot of areas to cover so this will be a bit of an info dump. I am always happy to answer questions to clear up any details.

Spring WebFlux

We will be utilising the non-blocking Spring WebFlux framework for developing the connection server. This will help to reduce the latency of having a high number of connections. WebFlux will also allow us to utilise Project Reactor's Reactive Streams implementation to write asynchronous code that is easier to reason about.

For information on the advantages of reactive programming feel free to visit the Spring or Project Reactor sites.

To use Spring WebFlux this dependency is required.

implementation 'org.springframework.boot:spring-boot-starter-webflux'
Enter fullscreen mode Exit fullscreen mode

Maybe link guide to setup and more info on reactive programming

PubSub Service

We will be abstracting the PubSub functionality into a service with a common message class and interface.

@Data  
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    String username;
    String message;
}
Enter fullscreen mode Exit fullscreen mode
public interface PubSubService {
    Mono<Void> publish(Message message); 
    Flux<Message> subscribe(); 
}
Enter fullscreen mode Exit fullscreen mode

Redis Configuration

To interface with the Redis server, we will need the Spring Boot Redis dependency.

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
Enter fullscreen mode Exit fullscreen mode

Redis will also have to be configured so that we can utilise its reactive API. We will be adding the necessary configuration beans into the RedisPubSubConfig class.

The ReactiveRedisTemplate bean is required to publish messages and is configured so it can serialise Message objects into JSON.

@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {

        StringRedisSerializer keySerializer = new StringRedisSerializer();
    Jackson2JsonRedisSerializer<Message> valueSerializer = 
                        new Jackson2JsonRedisSerializer<>(Message.class);

    RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
            RedisSerializationContext.newSerializationContext(keySerializer);

    RedisSerializationContext<String, Message> context =
            builder.value(valueSerializer).build();

    return new ReactiveRedisTemplate<>(factory, context);
}
Enter fullscreen mode Exit fullscreen mode

A ReactiveRedisMessageListenerContainer bean is needed for subscribing to message channels.

@Bean
ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
    return new ReactiveRedisMessageListenerContainer(factory);
}
Enter fullscreen mode Exit fullscreen mode

Redis PubSub Service

We can now create the RedisPubSubService by implementing the PubSubService interface and injecting the configured Redis beans.

@Service
public class RedisPubSubService implements PubSubService {

    private final ReactiveRedisTemplate<String, Message> reactiveTemplate;
    private final ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer;

    private final ChannelTopic channelTopic = new ChannelTopic("broadcast"); // channel used to send and recieve messages

    public RedisPubSubService(ReactiveRedisTemplate<String, Message> reactiveTemplate,
                              ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer) {
        this.reactiveMsgListenerContainer = reactiveMsgListenerContainer;
        this.reactiveTemplate = reactiveTemplate;
    }
Enter fullscreen mode Exit fullscreen mode

The publish interface method uses the Redis Template to serialise and publish the message to topic "broadcast". An empty Mono is returned to signal that message has been sent successfully.

@Override
public Mono<Void> publish(Message message) {
    return this.reactiveTemplate
            .convertAndSend(channelTopic.getTopic(), message)
            .then(Mono.empty());
}
Enter fullscreen mode Exit fullscreen mode

Subscribing to the channel is a bit more verbose and requires that we pass the channel as an Iterable. We also have to manually pass the SerializationContext from the Redis Template to deserialise incoming messages.

@Override
public Flux<Message> subscribe() {
    return reactiveMsgListenerContainer
            .receive(Collections.singletonList(channelTopic),
                    reactiveTemplate.getSerializationContext().getKeySerializationPair(),
                    reactiveTemplate.getSerializationContext().getValueSerializationPair())
            .map(ReactiveSubscription.Message::getMessage);
}
Enter fullscreen mode Exit fullscreen mode

We now have a functional PubSub service that can broadcast messages between the connection server instances.

Unit tests for this service can be found here.

RSocket

For connecting clients to the chat service, we will be using the RSocket protocol on top of a WebSocket connection. RSocket allows us to utilise the Reactive Streams standard over a network boundary and has some useful features such as backpressure and routing.

Backpressure will be particularly useful for our client application if we have a high surge of messages we can rate-limit the number of messages the client accepts.

To utilise RSocket we need to include the Spring Boot RSocket dependency:

implementation 'org.springframework.boot:spring-boot-starter-rsocket'
Enter fullscreen mode Exit fullscreen mode

We will also need to configure RSocket to use Websockets and to set an endpoint for clients to connect to.

spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rs
Enter fullscreen mode Exit fullscreen mode

Next, we can implement the PubSubController that is responsible for handling RSocket messages from the client. This controller injects the PubSubService we created earlier.

@Controller
@Slf4j
public class PubSubController {

        private final PubSubService messagingService;

    public PubSubController(PubSubService messagingService) {
        this.messagingService = messagingService;
    }

// .....
Enter fullscreen mode Exit fullscreen mode

RSocket Spring allows you to utilise two helpful annotation methods :

  • @ConnectMapping - used to handle when a new connection is made.
  • @MessageMapping - used to handle messages for a particular route.

We will utilise @ConnectMapping to log when new connections are made or closed.

@ConnectMapping
void onConnect(RSocketRequester requester) {
    Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
            .onClose()
            .doOnError(error -> log.warn(requester.rsocketClient() + " Closed"))
            .doFinally(consumer -> log.info(requester.rsocketClient() + " Disconnected"))
            .subscribe();
}
Enter fullscreen mode Exit fullscreen mode

@MessageMapping("publish") will forward sent messages to the PubSubService.

@MessageMapping("publish")
Mono<Void> publish(Message message) {
    return messagingService.publish(message);
}
Enter fullscreen mode Exit fullscreen mode

@MessageMapping("subscribe") will stream the messages from the PubSubService to the client.

@MessageMapping("subscribe")
Flux<Message> subscribe() {
    return messagingService.subscribe();
}
Enter fullscreen mode Exit fullscreen mode

Auth0 Authentication

We will be using the 3rd party service Auth0 for user sign up and authentication. The JWT user access tokens provided by Auth0 are also compatible with RSocket. This will allow us to secure individual messaging routes.

A great tutorial on using Auth0 with Spring Boot can be found here.

The unique username entered at sign up will be encoded into the access token and decoded whenever a message is sent. This removes the need to store the username in the chat service. Due to this encoding, the access token should be short-lived to reduce the risk of being compromised.

To be able to decode and verify JWT tokens in our app we will need this dependency.

implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
Enter fullscreen mode Exit fullscreen mode

We can then configure the authentication server endpoint and add properties for the correct auth0 audience and the custom claim used to encode the username.

spring.security.oauth2.resourceserver.jwt.issuer-uri=https://{YOUR_AUTH0_DOMAIN}.us.auth0.com/
auth0.audience={YOUR_AUTH0_API_AUDIENCE}
auth0.username-claim={YOUR_AUTH0_USERNAME_CLAIM}
Enter fullscreen mode Exit fullscreen mode

Securing RSocket

To secure RSocket include the RSocket Security dependency.

implementation 'org.springframework.security:spring-security-rsocket'
Enter fullscreen mode Exit fullscreen mode

To be able to pass the decoded JWT token as an AuthenticationPrinciple we will need the spring security messaging dependency.

implementation 'org.springframework.security:spring-security-messaging
Enter fullscreen mode Exit fullscreen mode

We can now create the RSocketSecurityConfig class by injecting the auth0 properties added earlier. We can also add the @EnableRSocketSecurity and @EnableReactiveMethodSecurtiy annotations to active Security support.

@Configuration
@EnableRSocketSecurity
@EnableReactiveMethodSecurity
public class RSocketSecurityConfig {

    @Value("${auth0.audience}")
    String audience;
    @Value("${spring.security.oauth2.resourceserver.jwt.issuer-uri}")
    String issuer;
    @Value("${auth0.username-claim}")
    String usernameClaim;

//......
Enter fullscreen mode Exit fullscreen mode

To decode the JWT token we will need to create a ReactiveJwtDecoder bean. Custom OAuth2TokenValidators are also added to check that the audience and username claims are included in the token.

@Bean
public ReactiveJwtDecoder reactiveJwtDecoder() {

    var reactiveJwtDecoder = (NimbusReactiveJwtDecoder) ReactiveJwtDecoders.fromOidcIssuerLocation(issuer);

    OAuth2TokenValidator<Jwt> audienceValidator = (jwt) -> {
        OAuth2Error error = new OAuth2Error("invalid_token", "The required audience is missing", null);
        if (jwt.getAudience().contains(audience)) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(error);
    };

    OAuth2TokenValidator<Jwt> usernameValidator = (jwt) -> {
        OAuth2Error error = new OAuth2Error("invalid_token", "The required username is missing", null);
        if (jwt.getClaimAsString(usernameClaim) != null) {
            return OAuth2TokenValidatorResult.success();
        }
        return OAuth2TokenValidatorResult.failure(error);
    };

    OAuth2TokenValidator<Jwt> withIssuer = JwtValidators.createDefaultWithIssuer(issuer);
    OAuth2TokenValidator<Jwt> compositeValidator = new DelegatingOAuth2TokenValidator<>(withIssuer, audienceValidator, usernameValidator);

    reactiveJwtDecoder.setJwtValidator(compositeValidator);

    return reactiveJwtDecoder;
}
Enter fullscreen mode Exit fullscreen mode

To prevent unauthorised users from sending messages we can secure the "publish" route by configuring a PayloadSocketAcceptorInterceptor bean to require JWT tokens.

@Bean
public PayloadSocketAcceptorInterceptor rsocketInterceptor(RSocketSecurity rSocketSecurity) {
    return rSocketSecurity.authorizePayload(authorize ->
                    authorize.route("publish").authenticated() 
                            .anyExchange().permitAll()) // everything else is permitted
            .jwt(Customizer.withDefaults())
            .build();
}
Enter fullscreen mode Exit fullscreen mode

An RSocketMessageHandler bean is also configured to allow an AuthenticationPrincple to be resolved in RSocket handler methods.

@Bean
RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
    RSocketMessageHandler mh = new RSocketMessageHandler();
    mh.getArgumentResolverConfigurer().addCustomResolver(
            new AuthenticationPrincipalArgumentResolver());
    mh.setRouteMatcher(new PathPatternRouteMatcher());
    mh.setRSocketStrategies(strategies);
    return mh;
}
Enter fullscreen mode Exit fullscreen mode

We can now access the decoded username claim when handling a message.

@Value("${auth0.username-claim}")
String usernameClaim;

//......

@MessageMapping("publish")
Mono<Void> publish(String message, @AuthenticationPrincipal Mono<Jwt> token) {
    return token.map(jwt -> jwt.getClaimAsString(usernameClaim))
            .flatMap(username -> messagingService.publish(new Message(username, message)));
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

This wraps up the overview of the implementation. Again I am always happy to answer any questions about this post or the reasoning behind the technology chosen.

Feel free to check the full source on GitHub.

Top comments (2)

Collapse
 
ndungujan23 profile image
ndungujan23

Salaale! Life saver! Thank you so much

Collapse
 
adimiradolouco profile image
adimiradorlouco

Thanks so much for sharing your knowledge! This article was perfect! Hugs from Brazil!!