Spring WebFlux is a web framework that’s built on top of Project Reactor, to give you asynchronous I/O, and allow your application to perform better. If you’re familiar with Spring MVC and building REST APIs.
Why Reactive Streams?
The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. more..
Basic architecture
In WebFlow, WebSockets are handled by implementing WebSocketHandler
. The handler is provided with a WebSocketSession
every time a connection is established. A WebSocketSession
represents the connection made by a single browser. It has 2 Flux streams associated with it, the receive()
stream for incoming messages and the send()
stream outgoing messages.
To link every WebSocketSession, a global message publisher (of type Sink.Many) is used. The publisher contains one Flux stream for all messages it receives.
Since not all clients will connect at the same time, the publisher is configured to retain the last 1000 messages and replays it to any new subscribers.
Configure WebFlux to Handle WebSockets
To create a WebSocket server we create first a WebSocketHandler
that broadcasts received messages to all connected clients in real time. We do the following:
@Slf4j
@Component
public class ChatWebSocketHandler implements WebSocketHandler {
private final Sinks.Many<Event> chatHistory = Sinks.many().replay().limit(1000);
private final ObjectMapper objectMapper = new ObjectMapper();
public Mono<Void> handle(WebSocketSession session) {
AtomicReference<Event> lastReceivedEvent = new AtomicReference<>();
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::toEvent)
.doOnNext(event -> {
lastReceivedEvent.set(event);
chatHistory.tryEmitNext(event);
})
.doOnComplete(() -> {
if(lastReceivedEvent.get() != null) {
lastReceivedEvent.get().setType(EventType.LEAVE);
chatHistory.tryEmitNext(lastReceivedEvent.get());
}
log.info("Completed!");
})
.zipWith(session.send(chatHistory.asFlux()
.map(this::toString)
.map(session::textMessage)))
.then();
}
@SneakyThrows
private Event toEvent(String message) {
return objectMapper.readValue(message, Event.class);
}
@SneakyThrows
private String toString(Event event) {
return objectMapper.writeValueAsString(event);
}
}
Then we can map it to a URL:
@Bean
public HandlerMapping webSocketMapping(ChatWebSocketHandler webSocketHandler) {
return new SimpleUrlHandlerMapping(Map.of("/chat", webSocketHandler), -1);
}
Publish Session Disconnects
Notify all clients when one client disconnects. By keeping track of the latest received message within a session so that we can retrieve user's name.
.doOnComplete(() -> {
if(lastReceivedEvent.get() != null) {
lastReceivedEvent.get().setType(EventType.LEAVE);
chatHistory.tryEmitNext(lastReceivedEvent.get());
}
Frontend
JS vanilla, HTML5 and CSS under resources folder. Using Springboot capability to serve frontend as static resources, configurations are defined as code in FrontEndConfig.
Configure springboot to serve the frontend
@Configuration
public class FrontEndConfig {
@Bean
public RouterFunction<ServerResponse> indexRouter(@Value("classpath:/index.html") final Resource indexHtml){
return route(GET("/").or(GET("/index.html")),
request -> ok().contentType(MediaType.TEXT_HTML).bodyValue(indexHtml));
}
@Bean
public RouterFunction<ServerResponse> cssRouter() {
return RouterFunctions
.resources("/css/**", new ClassPathResource("css/"));
}
@Bean
public RouterFunction<ServerResponse> jsRouter() {
return RouterFunctions
.resources("/js/**", new ClassPathResource("js/"));
}
@Bean
public RouterFunction<ServerResponse> imgRouter() {
return RouterFunctions
.resources("/img/**", new ClassPathResource("img/"));
}
}
Since we have server which keep the history up to certain limit, I decided to implement search/filtering logic in the frontend side using regex.
const connect = (event) => {
username = document.querySelector('#name').value.trim();
if (username) {
usernamePage.classList.add('hidden');
chatPage.classList.remove('hidden');
socket = new WebSocket("ws://localhost:8080/chat");
socket.onopen = () => {
//loadHistory();
socket.send(JSON.stringify({sender: username, type: 'JOIN'}));
connectingElement.classList.add('hidden');
};
socket.onmessage = (event) => onMessageReceived(event.data);
socket.onclose = (event) => {
username = null;
if (event.wasClean) {
console.log(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
} else {
alert('[close] Connection died');
}
};
socket.onerror = (error) => {
console.error(`[error] ${error.message}`);
connectingElement.textContent = 'Unable to connect to the server! Please refresh the page and try again.';
connectingElement.style.color = 'red';
}
}
event.preventDefault();
}
const send = (event) => {
const messageContent = messageInput.value;
if (messageContent) {
const chatMessage = {
sender: username,
content: messageInput.value,
type: 'CHAT'
};
socket.send(JSON.stringify(chatMessage));
messageInput.value = '';
event.preventDefault();
}
}
const onMessageReceived = (payload) => {
const message = JSON.parse(payload);
renderMessage(message);
}
const renderMessage = (message) => {
const messageElement = document.createElement('li');
if (message.type === 'JOIN') {
messageElement.classList.add('event-message');
message.content = message.sender + ' joined!';
} else if (message.type === 'LEAVE') {
messageElement.classList.add('event-message');
message.content = message.sender + ' left!';
} else {
messageElement.classList.add('chat-message');
const avatarElement = document.createElement('i');
const avatarText = document.createTextNode(message.sender[0]);
avatarElement.appendChild(avatarText);
avatarElement.style['background-color'] = getAvatarColor(getHash(message.sender));
messageElement.appendChild(avatarElement);
const usernameElement = document.createElement('span');
const usernameText = document.createTextNode(message.sender);
usernameElement.appendChild(usernameText);
messageElement.appendChild(usernameElement);
}
const textElement = document.createElement('p');
const messageText = document.createTextNode(message.content);
textElement.appendChild(messageText);
messageElement.appendChild(textElement);
messageArea.appendChild(messageElement);
messageArea.scrollTop = messageArea.scrollHeight;
}
Integration test
@Test
@DisplayName("test - send 4 messages to the server /chat endpoint. Server should broadcast the message to the registered clients")
void shouldBroadcastAllSentMessagesToTheClient() {
//clients
WebSocketClient client = new ReactorNettyWebSocketClient();
String msgToBeSent = eventToString(TEST_PAYLOAD);
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
int count = 4;
Flux<String> input = Flux.range(1, count).map(index -> msgToBeSent.replace(CHAT_MSG_CONTENT, CHAT_MSG_CONTENT + index));
client.execute(URI.create(format("ws://localhost:%s/chat", port)),
session -> session.send(input.map(session::textMessage))
.thenMany(
session.receive().take(count).map(WebSocketMessage::getPayloadAsText)
.doOnNext(sink::tryEmitNext)
.then())
.then())
.block(Duration.ofSeconds(1));
StepVerifier.create(sink.asFlux().take(count).map(TestHelper::stringToEvent).map(Event::getContent))
.expectNext(CHAT_MSG_CONTENT + 1)
.expectNext(CHAT_MSG_CONTENT + 2)
.expectNext(CHAT_MSG_CONTENT + 3)
.expectNext(CHAT_MSG_CONTENT + 4)
.verifyComplete();
}
Top comments (0)