Introduction
This post will explore how to build a single real-time chat room similar to Twitch chat. The focus will be on developing a service that can scale horizontally to maintain availability and an increase in demand. This service will be developed with Java Spring Boot.
Feel free to check the repository on GitHub.
Requirements
As the main goal of this post will be to explore real-time chat scalability, chatroom functionality will be kept to a minimum.
- Users will be able to join a single global chat room. There will not be rooms to select.
- To simplify the service messages will only be text.
- Similar to twitch, users will only be able to see chat messages from when they joined the room. There will be no chat history.
- Users will be able to view messages anonymous but will only be able to send messages to the chat once they have signed up with a unique username.
- Message senders will be identified by username only.
Architecture
To facilitate real-time communication we will be using the WebSocket Protocol to transport messages between clients and the service. To horizontally scale, multiple server instances will be needed so new WebSocket connections can be load balanced between them.
With WebSocket connections load-balanced between the instances, we can utilise a message broker to fan-out messages to all of the clients. For this project, we will be brokering messages with Redis PubSub.
Redis PubSub
Redis PubSub will allow for a high throughput of messages to be fanned out to its subscribers with minimal latency. It can also be distributed to multiple instances to increase throughput and redundancy. Here is a great talk by Shahar Mor that goes into more detail.
Limitations
One trade-off with Redis PubSub is that it only "fires and forgets" to subscribers and offers no way to guarantee message delivery. This could result in messages being dropped if a server instance is restarted and users have to reconnect.
Adding guaranteed message delivery is outside the scope of this post. The argument could also be made that infrequent message drops for some users would not affect the experience of a live stream chat.
If you require a higher degree of message reliability it is worth checking out other message brokers such as Kafka, RabbitMQ or even Redis Streams.
Testing
In a later post, we will be developing a React client that will connect to our chat service but until then we will be checking that the requirements are met by developing integration tests within the Spring framework.
We will use the Playtika testcontainers-springboot library to easily spin up a Redis docker container while running the integration tests locally. This can then be used as our PubSub message broker as we test the functionality of the connection server.
All integration tests used to meet the requirements can be found here.
Implementation
Warning! There is a lot of areas to cover so this will be a bit of an info dump. I am always happy to answer questions to clear up any details.
Spring WebFlux
We will be utilising the non-blocking Spring WebFlux framework for developing the connection server. This will help to reduce the latency of having a high number of connections. WebFlux will also allow us to utilise Project Reactor's Reactive Streams implementation to write asynchronous code that is easier to reason about.
For information on the advantages of reactive programming feel free to visit the Spring or Project Reactor sites.
To use Spring WebFlux this dependency is required.
implementation 'org.springframework.boot:spring-boot-starter-webflux'
Maybe link guide to setup and more info on reactive programming
PubSub Service
We will be abstracting the PubSub functionality into a service with a common message class and interface.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
String username;
String message;
}
public interface PubSubService {
Mono<Void> publish(Message message);
Flux<Message> subscribe();
}
Redis Configuration
To interface with the Redis server, we will need the Spring Boot Redis dependency.
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
Redis will also have to be configured so that we can utilise its reactive API. We will be adding the necessary configuration beans into the RedisPubSubConfig class.
The ReactiveRedisTemplate bean is required to publish messages and is configured so it can serialise Message objects into JSON.
@Bean
public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer<Message> valueSerializer =
new Jackson2JsonRedisSerializer<>(Message.class);
RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
RedisSerializationContext.newSerializationContext(keySerializer);
RedisSerializationContext<String, Message> context =
builder.value(valueSerializer).build();
return new ReactiveRedisTemplate<>(factory, context);
}
A ReactiveRedisMessageListenerContainer bean is needed for subscribing to message channels.
@Bean
ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisMessageListenerContainer(factory);
}
Redis PubSub Service
We can now create the RedisPubSubService by implementing the PubSubService interface and injecting the configured Redis beans.
@Service
public class RedisPubSubService implements PubSubService {
private final ReactiveRedisTemplate<String, Message> reactiveTemplate;
private final ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer;
private final ChannelTopic channelTopic = new ChannelTopic("broadcast"); // channel used to send and recieve messages
public RedisPubSubService(ReactiveRedisTemplate<String, Message> reactiveTemplate,
ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer) {
this.reactiveMsgListenerContainer = reactiveMsgListenerContainer;
this.reactiveTemplate = reactiveTemplate;
}
The publish interface method uses the Redis Template to serialise and publish the message to topic "broadcast". An empty Mono is returned to signal that message has been sent successfully.
@Override
public Mono<Void> publish(Message message) {
return this.reactiveTemplate
.convertAndSend(channelTopic.getTopic(), message)
.then(Mono.empty());
}
Subscribing to the channel is a bit more verbose and requires that we pass the channel as an Iterable. We also have to manually pass the SerializationContext from the Redis Template to deserialise incoming messages.
@Override
public Flux<Message> subscribe() {
return reactiveMsgListenerContainer
.receive(Collections.singletonList(channelTopic),
reactiveTemplate.getSerializationContext().getKeySerializationPair(),
reactiveTemplate.getSerializationContext().getValueSerializationPair())
.map(ReactiveSubscription.Message::getMessage);
}
We now have a functional PubSub service that can broadcast messages between the connection server instances.
Unit tests for this service can be found here.
RSocket
For connecting clients to the chat service, we will be using the RSocket protocol on top of a WebSocket connection. RSocket allows us to utilise the Reactive Streams standard over a network boundary and has some useful features such as backpressure and routing.
Backpressure will be particularly useful for our client application if we have a high surge of messages we can rate-limit the number of messages the client accepts.
To utilise RSocket we need to include the Spring Boot RSocket dependency:
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
We will also need to configure RSocket to use Websockets and to set an endpoint for clients to connect to.
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rs
Next, we can implement the PubSubController that is responsible for handling RSocket messages from the client. This controller injects the PubSubService we created earlier.
@Controller
@Slf4j
public class PubSubController {
private final PubSubService messagingService;
public PubSubController(PubSubService messagingService) {
this.messagingService = messagingService;
}
// .....
RSocket Spring allows you to utilise two helpful annotation methods :
- @ConnectMapping - used to handle when a new connection is made.
- @MessageMapping - used to handle messages for a particular route.
We will utilise @ConnectMapping to log when new connections are made or closed.
@ConnectMapping
void onConnect(RSocketRequester requester) {
Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
.onClose()
.doOnError(error -> log.warn(requester.rsocketClient() + " Closed"))
.doFinally(consumer -> log.info(requester.rsocketClient() + " Disconnected"))
.subscribe();
}
@MessageMapping("publish") will forward sent messages to the PubSubService.
@MessageMapping("publish")
Mono<Void> publish(Message message) {
return messagingService.publish(message);
}
@MessageMapping("subscribe") will stream the messages from the PubSubService to the client.
@MessageMapping("subscribe")
Flux<Message> subscribe() {
return messagingService.subscribe();
}
Auth0 Authentication
We will be using the 3rd party service Auth0 for user sign up and authentication. The JWT user access tokens provided by Auth0 are also compatible with RSocket. This will allow us to secure individual messaging routes.
A great tutorial on using Auth0 with Spring Boot can be found here.
The unique username entered at sign up will be encoded into the access token and decoded whenever a message is sent. This removes the need to store the username in the chat service. Due to this encoding, the access token should be short-lived to reduce the risk of being compromised.
To be able to decode and verify JWT tokens in our app we will need this dependency.
implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
We can then configure the authentication server endpoint and add properties for the correct auth0 audience and the custom claim used to encode the username.
spring.security.oauth2.resourceserver.jwt.issuer-uri=https://{YOUR_AUTH0_DOMAIN}.us.auth0.com/
auth0.audience={YOUR_AUTH0_API_AUDIENCE}
auth0.username-claim={YOUR_AUTH0_USERNAME_CLAIM}
Securing RSocket
To secure RSocket include the RSocket Security dependency.
implementation 'org.springframework.security:spring-security-rsocket'
To be able to pass the decoded JWT token as an AuthenticationPrinciple we will need the spring security messaging dependency.
implementation 'org.springframework.security:spring-security-messaging
We can now create the RSocketSecurityConfig class by injecting the auth0 properties added earlier. We can also add the @EnableRSocketSecurity and @EnableReactiveMethodSecurtiy annotations to active Security support.
@Configuration
@EnableRSocketSecurity
@EnableReactiveMethodSecurity
public class RSocketSecurityConfig {
@Value("${auth0.audience}")
String audience;
@Value("${spring.security.oauth2.resourceserver.jwt.issuer-uri}")
String issuer;
@Value("${auth0.username-claim}")
String usernameClaim;
//......
To decode the JWT token we will need to create a ReactiveJwtDecoder bean. Custom OAuth2TokenValidators are also added to check that the audience and username claims are included in the token.
@Bean
public ReactiveJwtDecoder reactiveJwtDecoder() {
var reactiveJwtDecoder = (NimbusReactiveJwtDecoder) ReactiveJwtDecoders.fromOidcIssuerLocation(issuer);
OAuth2TokenValidator<Jwt> audienceValidator = (jwt) -> {
OAuth2Error error = new OAuth2Error("invalid_token", "The required audience is missing", null);
if (jwt.getAudience().contains(audience)) {
return OAuth2TokenValidatorResult.success();
}
return OAuth2TokenValidatorResult.failure(error);
};
OAuth2TokenValidator<Jwt> usernameValidator = (jwt) -> {
OAuth2Error error = new OAuth2Error("invalid_token", "The required username is missing", null);
if (jwt.getClaimAsString(usernameClaim) != null) {
return OAuth2TokenValidatorResult.success();
}
return OAuth2TokenValidatorResult.failure(error);
};
OAuth2TokenValidator<Jwt> withIssuer = JwtValidators.createDefaultWithIssuer(issuer);
OAuth2TokenValidator<Jwt> compositeValidator = new DelegatingOAuth2TokenValidator<>(withIssuer, audienceValidator, usernameValidator);
reactiveJwtDecoder.setJwtValidator(compositeValidator);
return reactiveJwtDecoder;
}
To prevent unauthorised users from sending messages we can secure the "publish" route by configuring a PayloadSocketAcceptorInterceptor bean to require JWT tokens.
@Bean
public PayloadSocketAcceptorInterceptor rsocketInterceptor(RSocketSecurity rSocketSecurity) {
return rSocketSecurity.authorizePayload(authorize ->
authorize.route("publish").authenticated()
.anyExchange().permitAll()) // everything else is permitted
.jwt(Customizer.withDefaults())
.build();
}
An RSocketMessageHandler bean is also configured to allow an AuthenticationPrincple to be resolved in RSocket handler methods.
@Bean
RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
RSocketMessageHandler mh = new RSocketMessageHandler();
mh.getArgumentResolverConfigurer().addCustomResolver(
new AuthenticationPrincipalArgumentResolver());
mh.setRouteMatcher(new PathPatternRouteMatcher());
mh.setRSocketStrategies(strategies);
return mh;
}
We can now access the decoded username claim when handling a message.
@Value("${auth0.username-claim}")
String usernameClaim;
//......
@MessageMapping("publish")
Mono<Void> publish(String message, @AuthenticationPrincipal Mono<Jwt> token) {
return token.map(jwt -> jwt.getClaimAsString(usernameClaim))
.flatMap(username -> messagingService.publish(new Message(username, message)));
}
Conclusion
This wraps up the overview of the implementation. Again I am always happy to answer any questions about this post or the reasoning behind the technology chosen.
Feel free to check the full source on GitHub.
Top comments (2)
Salaale! Life saver! Thank you so much
Thanks so much for sharing your knowledge! This article was perfect! Hugs from Brazil!!