DEV Community

Anthony Ikeda
Anthony Ikeda

Posted on

Quickest WebSockets with Quarkus and Angular 10

Quarkus helps you get started with both messaging and WebSockets really quickly. But what happens when you want to marry the two technologies together?

Before we start though the source code used in this articel can be found here:

https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui


Getting started with Quarkus WebSockets is as simple as this:

$ mvn io.quarkus:quarkus-maven-plugin:1.7.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=websockets-quickstart \
    -Dextensions="undertow-websockets"
$ cd websockets-quickstart
Enter fullscreen mode Exit fullscreen mode

This will create your typical maven based source code structure with the following highlights:

  • quarkus-undertow-websockets dependency added
  • quarkus-resteasy dependency add by default

Getting your first integration up and running is actually quite easy.

  1. Create a new class representing the WebSocket Endpoint
  2. Implement the standard WebSocket lifecycle methods (onOpen, onError, onMessage, onClose)
  3. Create a UI to integrate with your WebSocket endpoint

Implementing the Functionality

Rather than implementing an interface or extending a base class, Quarkus WebSockets uses annotations to implement the lifecycle:

@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {

    private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);

    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        log.debug("{} has just connected", username);
    }

    @OnError
    public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
        log.error("{} encountered an error", username);
    }

    @OnMessage
    public void onMessage(String message, @PathParam("username") String username) {
        log.debug("{} has just sent us a message: {}", username, message);
    }

    @OnClose
    public void onClose(Session session, @PathParam("username") String username) {
        log.debug("{} has now disconnected", username);
    }
}
Enter fullscreen mode Exit fullscreen mode

The thing to remember with the server-side component is the Session. This is the way you communicate with the end user. For the sake of this article we are going to use the AsyncRemote and send an object back to the user.

@ServerEndpoint("/chat/{username}")
@ApplicationScoped
public class SocketEndpoint {

    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        log.debug("{} has just connected", username);
        session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
    }

...
}
Enter fullscreen mode Exit fullscreen mode

When you connect via the front end, the onOpen method will be instantiated. Here you can set up the user's interaction and send back a message confirming any actions. Here we will just send back a reply.


Logging and a CORS
Before we continue we are going to configure logging to ensure we can see our debug messages.

In the src/main/resources/application.properties file add the following entries:

quarkus.log.category."com.brightfield.streams".level=ALL
Enter fullscreen mode Exit fullscreen mode

We should also enable CORS so we will also need:

quarkus.http.cors.enabled=true
quarkus.http.cors.origins=http://localhost:4200
quarkus.http.cors.methods=get,post,put,head,options
Enter fullscreen mode Exit fullscreen mode

And since I have so many apps running on port 8080 I’m going to change the port to 8011:

quarkus.http.port=8011
Enter fullscreen mode Exit fullscreen mode

Let's create a unit test to test this out:

ServerEndpointTest.java

package com.brightfield.streams;

import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

@QuarkusTest
public class SocketEndpointTest {

    private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();

    @TestHTTPResource("/chat/testuser")
    URI uri;

    @Test
    public void testWebSocketChat() throws Exception {
        try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
            Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
            Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
        }
    }

    @ClientEndpoint
    public static class Client {
        private final Logger log = LoggerFactory.getLogger(Client.class);

        @OnOpen
        public void open(final Session session) {
            log.debug("Connecting to server");
            String toSend = "Connecting to central control...";
            session.getAsyncRemote().sendText(toSend);
        }

        @OnMessage
        void message(final String message) {
            log.debug("Incoming message: {}", message);
            MESSAGES.add(message);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

So, what have we done here?

First we set up a queue to store messages that are going through the pipeline. When ever we send a message or receive it on the client side we want to queue up the messages to validate the order they arrive.

In this case the first message will be the message sent when the Client.class first connects: "Welcome to the show "

When the client connects we are going to send our first message: "Connection to central control...". This will be our second message in the sequence.

If you compile and run the code you should see our test passing with the debug much like this:

INFO  [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
INFO  [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO  [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
INFO  [io.quarkus] (main) Profile test activated.
INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
Enter fullscreen mode Exit fullscreen mode

If we consider the sequence of events:

  1. Client connects to the Server
  2. Server sends a welcome message: "Welcome to the show testuser"
  3. Client sends message to the server: "Connecting to central control..."
  4. Client receives message: "Welcome to the show testuser"

Our test tracks the server side interaction and the client side interaction.

Creating the UI

Let's look at creating a UI in Angular 10 to get a better picture.

Start by creating your Angular app:

$ ng new chat-ui
? Would you like to add Angular routing? Yes
? Which stylesheet format would you like to use? CSS
... 
Installing packages...
✔ Packages installed successfully.
    Successfully initialized git.
Enter fullscreen mode Exit fullscreen mode

Next we want to make sure we have Reactive Forms lodes in the app.module.ts:

@NgModule({
...
  imports: [
    BrowserModule,
    AppRoutingModule,
    BrowserAnimationsModule,
    ReactiveFormsModule,
  ],

});
Enter fullscreen mode Exit fullscreen mode

We want to create 2 classes:

  • Service that manages the WebSocket state
  • Component that displays our interactions
$ ng g s _services/socket
CREATE src/app/_services/socket.service.spec.ts (357 bytes)
CREATE src/app/_services/socket.service.ts (135 bytes)
$ ng g c chat
CREATE src/app/chat/chat.component.css (0 bytes)
CREATE src/app/chat/chat.component.html (19 bytes)
CREATE src/app/chat/chat.component.spec.ts (612 bytes)
CREATE src/app/chat/chat.component.ts (267 bytes)
UPDATE src/app/app.module.ts (388 bytes)
Enter fullscreen mode Exit fullscreen mode

For the sake of best practices, let's first configure an environment variable for the Quarkus application endpoint:

src/environments/environment.ts

export const environment = {
  production: false,
  socket_endpoint: 'ws://localhost:8011'
};
Enter fullscreen mode Exit fullscreen mode

Implementing our service

To connect and interact with the server-side components we are going to utilise some built-in rxjs classes:

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
Enter fullscreen mode Exit fullscreen mode

RXJS and WebSockets
RXJS makes it just as easy to connect to a ServerEndpoint as it is to implement.

The WebSocketSubject represents the state of the communication between the client and server.  Just like a BehaviorSubject we are going to push messages and subscribe to the response over the WebSocketSubject.

The webSocket class represents our factory to create WebSocketSubject connection to the server. We will pass in the URL to our service and it will return the WebSocketSubject for us to push and subscribe to.


There are 3 parts of the lifecycle we need to implement:

  • Connect (onOpen)
  • Close/Destroy (onClose)
  • Send (onMessage)

chat.service.ts

import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';

@Injectable({
  providedIn: 'root'
})
export class SocketService {

  connection$: WebSocketSubject<any>;

  constructor() { }

  connect(): Observable<any> {
    this.connection$ = webSocket({
      url: `${env.socket_endpoint}/chat/angularuser`,
      deserializer: ({data}) => data,
      serializer: ({data}) => data,
    });
    return this.connection$;
  }
...
}
Enter fullscreen mode Exit fullscreen mode

When creating a WebSocket connection with RXJS, the default serialization/deserialization is JSON.parse. Since we are using plain test on our server side component, we will override the serde without parsing the data.

Later we will see how we call the connect() method and make the initial connection that we can send and receive messages over.

In order to send a message, we need to queue up the message much like any Observable you may have had experience with:

import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';

@Injectable({
  providedIn: 'root'
})
export class SocketService {

  connection$: WebSocketSubject<any>;

  constructor() { }

  connect(): Observable<any> {
    this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
    return this.connection$;
  }

  send(data: any): void {
    if (this.connection$) {
      this.connection$.next(data);
    } else {
      console.log('Did not send data, unable to open connection');
    }
  }

}
Enter fullscreen mode Exit fullscreen mode

While our connection$ pipe is open, we use the next() method to send our object to the server. If we have lost connectivity we will, for now, just log a message.

One last thing, if we ever disconnect from the server, we want to ensure we close the connection and trigger the backend event @OnClose, so let's implement a closeConnection() method and call it in an onDestroy() event:

chat.service.ts

import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';

@Injectable({
  providedIn: 'root'
})
export class SocketService {

  connection$: WebSocketSubject<any>;

  constructor() { }

  connect(): Observable<any> {
    this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
    return this.connection$;
  }

  send(data: any): void {
    if (this.connection$) {
      this.connection$.next(data);
    } else {
      console.log('Did not send data, unable to open connection');
    }
  }

  closeConnection(): void {
    if (this.connection$) {
      this.connection$.complete();
      this.connection$= null;
    }
  }

  ngOnDestroy() {
    this.closeConnection();
  }

}
Enter fullscreen mode Exit fullscreen mode

Creating our Angular Component

As you can see we have a very straight forward implementation that uses standard Observable patterns. In order to use this service you need to create a component that will initiate the connection and send the data over the websocket connection:

import { Component, OnInit } from '@angular/core';
import { SocketService } from '../_services/socket.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { FormControl } from '@angular/forms';

@Component({
  selector: 'app-chat',
  templateUrl: './chat.component.html',
  styleUrls: ['./chat.component.css']
})
export class ChatComponent implements OnInit {

  messages: string[] = [];
  msgControl = new FormControl('');
  destroyed$ = new Subject();

  constructor(private chatService: SocketService) { }

  ngOnInit(): void {
    const chatSub$ = this.chatService.connect().pipe(
      takeUntil(this.destroyed$),
    );

    chatSub$.subscribe(message => this.messages.push(message));
  }

  sendMessage(): void {
    this.chatService.send(this.msgControl.value);
    this.msgControl.setValue('');
  }

  ngOnDestroy(): void {
    this.destroyed$.next();
  }

}
Enter fullscreen mode Exit fullscreen mode

chat.component.html

<ul>
  <li *ngFor="let message of messages">{{ message }}</li>
</ul>
<input placeholder="Send a message..." [formControl]="msgControl">
<button (click)="sendMessage()">Send</button>
Enter fullscreen mode Exit fullscreen mode

Let's also quickly add a route for our new component:

app-routing.module.ts

import { NgModule } from '@angular/core';
import { Routes, RouterModule } from '@angular/router';
import { ChatComponent } from './chat/chat.component';

const routes: Routes = [
  { path: 'chat', component: ChatComponent }
];

@NgModule({
  imports: [RouterModule.forRoot(routes)],
  exports: [RouterModule]
})
export class AppRoutingModule { }
Enter fullscreen mode Exit fullscreen mode

As you can see in our component we are going to call the methods of the SocketService and run the lifecycle of the websocket connection. The user interface is a simple form control that has a list of messages that come back.

Start up the service and the angular user interface and you should be able to access the configured route at http://localhost:4200/chat

Alt Text

When you access the page you should see our initial message "Welcome to the show angularuser" and an input box.

If we check out the logs we should see the initial connection being made:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
Enter fullscreen mode Exit fullscreen mode

If you enter a message and hit Send, you should see the message being logged server-side:

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just sent us a message: "Good morning"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-8) angularuser has just connected
Enter fullscreen mode Exit fullscreen mode

So far so good, but we want this to be more interactive. For the sake of this article, let's just echo back what the user sends:

@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {

    private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
    private Map<String, Session> socketSessions = new HashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        log.debug("{} has just connected", username);
        session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
        socketSessions.put(username, session);
    }

    @OnError
    public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
        log.error("{} encountered an error", username);
    }

    @OnMessage
    public void onMessage(String message, @PathParam("username") String username) {
        log.debug("{} has just sent us a message: {}", username, message);
        Session session = socketSessions.get(username);
        session.getAsyncRemote().sendText(message);
    }

    public void onClose(Session session, @PathParam("username") String username) {
        log.debug("{} has now disconnected", username);
    }
}
Enter fullscreen mode Exit fullscreen mode

In the update to the code, when the user connects, we are going to keep a reference of the Session in a HashMap indexed on the username. When a message comes in, we will lookup the session and then send the message back.

Alt Text

DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "Glad to be here"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "What day is it?"
Enter fullscreen mode Exit fullscreen mode

In the next article I will demonstrate how to wire up Kafka to the WebSocket sessions and broadcast messages coming in from the Kafka queues.

Discussion (0)