DEV Community

mahmoudkarrar
mahmoudkarrar

Posted on

How To Build a Real Time Chat App Using WebFlux, WebSockets & Javascript

Spring WebFlux is a web framework that’s built on top of Project Reactor, to give you asynchronous I/O, and allow your application to perform better. If you’re familiar with Spring MVC and building REST APIs.

Why Reactive Streams?

The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – like passing elements on to another thread or thread-pool – while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. more..

Basic architecture

In WebFlow, WebSockets are handled by implementing WebSocketHandler. The handler is provided with a WebSocketSession every time a connection is established. A WebSocketSession represents the connection made by a single browser. It has 2 Flux streams associated with it, the receive() stream for incoming messages and the send() stream outgoing messages.

To link every WebSocketSession, a global message publisher (of type Sink.Many) is used. The publisher contains one Flux stream for all messages it receives.

Since not all clients will connect at the same time, the publisher is configured to retain the last 1000 messages and replays it to any new subscribers.

Image description

Configure WebFlux to Handle WebSockets

To create a WebSocket server we create first a WebSocketHandler that broadcasts received messages to all connected clients in real time. We do the following:

@Slf4j
@Component
public class ChatWebSocketHandler implements WebSocketHandler {

    private final Sinks.Many<Event> chatHistory = Sinks.many().replay().limit(1000);
    private final ObjectMapper objectMapper = new ObjectMapper();

    public Mono<Void> handle(WebSocketSession session) {
        AtomicReference<Event> lastReceivedEvent = new AtomicReference<>();
        return session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(this::toEvent)
                .doOnNext(event -> {
                    lastReceivedEvent.set(event);
                    chatHistory.tryEmitNext(event);
                })
                .doOnComplete(() -> {
                    if(lastReceivedEvent.get() != null) {
                        lastReceivedEvent.get().setType(EventType.LEAVE);
                        chatHistory.tryEmitNext(lastReceivedEvent.get());
                    }
                    log.info("Completed!");
                })
                .zipWith(session.send(chatHistory.asFlux()
                        .map(this::toString)
                        .map(session::textMessage)))
                .then();
    }

    @SneakyThrows
    private Event toEvent(String message) {
        return objectMapper.readValue(message, Event.class);
    }

    @SneakyThrows
    private String toString(Event event) {
        return objectMapper.writeValueAsString(event);
    }
}
Enter fullscreen mode Exit fullscreen mode

Then we can map it to a URL:

@Bean
    public HandlerMapping webSocketMapping(ChatWebSocketHandler webSocketHandler) {
        return new SimpleUrlHandlerMapping(Map.of("/chat", webSocketHandler), -1);
    }
Enter fullscreen mode Exit fullscreen mode

Publish Session Disconnects

Notify all clients when one client disconnects. By keeping track of the latest received message within a session so that we can retrieve user's name.

.doOnComplete(() -> {
                    if(lastReceivedEvent.get() != null) {
                        lastReceivedEvent.get().setType(EventType.LEAVE);
                        chatHistory.tryEmitNext(lastReceivedEvent.get());
                    }
Enter fullscreen mode Exit fullscreen mode

Frontend

JS vanilla, HTML5 and CSS under resources folder. Using Springboot capability to serve frontend as static resources, configurations are defined as code in FrontEndConfig.

Configure springboot to serve the frontend

@Configuration
public class FrontEndConfig {
    @Bean
    public RouterFunction<ServerResponse> indexRouter(@Value("classpath:/index.html") final Resource indexHtml){
        return  route(GET("/").or(GET("/index.html")),
                request -> ok().contentType(MediaType.TEXT_HTML).bodyValue(indexHtml));
    }
    @Bean
    public RouterFunction<ServerResponse> cssRouter() {
        return RouterFunctions
                .resources("/css/**", new ClassPathResource("css/"));
    }
    @Bean
    public RouterFunction<ServerResponse> jsRouter() {
        return RouterFunctions
                .resources("/js/**", new ClassPathResource("js/"));
    }
    @Bean
    public RouterFunction<ServerResponse> imgRouter() {
        return RouterFunctions
                .resources("/img/**", new ClassPathResource("img/"));
    }
}

Enter fullscreen mode Exit fullscreen mode

Since we have server which keep the history up to certain limit, I decided to implement search/filtering logic in the frontend side using regex.

const connect = (event) => {
    username = document.querySelector('#name').value.trim();

    if (username) {
        usernamePage.classList.add('hidden');
        chatPage.classList.remove('hidden');

        socket = new WebSocket("ws://localhost:8080/chat");

        socket.onopen = () => {
            //loadHistory();
            socket.send(JSON.stringify({sender: username, type: 'JOIN'}));
            connectingElement.classList.add('hidden');
        };

        socket.onmessage = (event) => onMessageReceived(event.data);

        socket.onclose = (event) => {
            username = null;
            if (event.wasClean) {
                console.log(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`);
            } else {
                alert('[close] Connection died');
            }
        };

        socket.onerror =  (error) => {
            console.error(`[error] ${error.message}`);
            connectingElement.textContent = 'Unable to connect to the server! Please refresh the page and try again.';
            connectingElement.style.color = 'red';
        }

    }
    event.preventDefault();
}


const send = (event) => {
    const messageContent = messageInput.value;

    if (messageContent) {
        const chatMessage = {
            sender: username,
            content: messageInput.value,
            type: 'CHAT'
        };

        socket.send(JSON.stringify(chatMessage));
        messageInput.value = '';
        event.preventDefault();
    }
}


const onMessageReceived = (payload) => {
    const message = JSON.parse(payload);
    renderMessage(message);
}

const renderMessage = (message) => {
    const messageElement = document.createElement('li');

    if (message.type === 'JOIN') {
        messageElement.classList.add('event-message');
        message.content = message.sender + ' joined!';
    } else if (message.type === 'LEAVE') {
        messageElement.classList.add('event-message');
        message.content = message.sender + ' left!';
    } else {
        messageElement.classList.add('chat-message');

        const avatarElement = document.createElement('i');
        const avatarText = document.createTextNode(message.sender[0]);
        avatarElement.appendChild(avatarText);
        avatarElement.style['background-color'] = getAvatarColor(getHash(message.sender));

        messageElement.appendChild(avatarElement);

        const usernameElement = document.createElement('span');
        const usernameText = document.createTextNode(message.sender);
        usernameElement.appendChild(usernameText);
        messageElement.appendChild(usernameElement);
    }

    const textElement = document.createElement('p');
    const messageText = document.createTextNode(message.content);
    textElement.appendChild(messageText);

    messageElement.appendChild(textElement);

    messageArea.appendChild(messageElement);
    messageArea.scrollTop = messageArea.scrollHeight;
}

Enter fullscreen mode Exit fullscreen mode

Integration test

@Test
    @DisplayName("test - send 4 messages to the server /chat endpoint. Server should broadcast the message to the registered clients")
    void shouldBroadcastAllSentMessagesToTheClient() {
        //clients
        WebSocketClient client = new ReactorNettyWebSocketClient();

        String msgToBeSent = eventToString(TEST_PAYLOAD);
        Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
        int count = 4;
        Flux<String> input = Flux.range(1, count).map(index -> msgToBeSent.replace(CHAT_MSG_CONTENT, CHAT_MSG_CONTENT + index));

        client.execute(URI.create(format("ws://localhost:%s/chat", port)),
                        session -> session.send(input.map(session::textMessage))
                                .thenMany(
                                        session.receive().take(count).map(WebSocketMessage::getPayloadAsText)
                                                .doOnNext(sink::tryEmitNext)
                                                .then())
                                .then())
                .block(Duration.ofSeconds(1));


        StepVerifier.create(sink.asFlux().take(count).map(TestHelper::stringToEvent).map(Event::getContent))
                .expectNext(CHAT_MSG_CONTENT + 1)
                .expectNext(CHAT_MSG_CONTENT + 2)
                .expectNext(CHAT_MSG_CONTENT + 3)
                .expectNext(CHAT_MSG_CONTENT + 4)
                .verifyComplete();
    }

Enter fullscreen mode Exit fullscreen mode

Resources

  1. How To Build a Chat App Using WebFlux, WebSockets & React
  2. WebSocket API Spring

Sourcecode on github

Code repo

Top comments (0)