DEV Community

dmitryvz
dmitryvz

Posted on

Server-sent events in NestJS in depth

Server-sent events (SSE) offer an efficient way to implement real-time updates in web applications by allowing servers to push data to connected clients.

Traditionally, a web page has to send a request to the server to receive new data; that is, the page requests data from the server. With server-sent events, it's possible for a server to send new data to a web page at any time, by pushing messages to the web page. (MDN)

In other words, the client establishes a permanent connection with the server, allowing the server to send data to the client. All modern browsers support SSE so you don't have to bring another JS library.

We use SSE at ClapDuel to update counters on page in real-time.

NestJS offers SSE out of box. Here is an example from the docs:

@Sse('sse')
sse(): Observable<any> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}
Enter fullscreen mode Exit fullscreen mode

Basically, you need to prepend controller action with @Sse decorator and return an observable(RxJs).

In a real application, you'll use SSE to send data when specific events occur. For instance, in a chat application, you'd send information about a new message to all connected users when a new message is posted.
SSE implementation diagram
In the context of NestJS, this means that a user is sending data to Action A while all users are connected to the SSE action. You need to inform the SSE action that something has happened in Action A. This can be done in many ways. In this article we will explore two solutions:

  • RxJs observables
  • EventEmitter

Here's an example that extends the one from the documentation, using observables:

private readonly sseStream = new Subject()

@Post()
actionA() {
  this.sseStream.next({type: 'new-message', data: 'Hello'})
}

@Sse('sse')
sse(): Observable<any> {
  return this.sseStream;
}
Enter fullscreen mode Exit fullscreen mode

Although very simple, this approach has one major drawback - inside the SSE action the connection is already established, so, for example, you cannot decline the connection with a 403 or 404 error.

In next example we will implement SSE manually. Thanks to NestJS, manual implementation is made more straightforward with the built-in SseStream class.

@Get('sse')
sse(@Req() request, @Res() response) {
  const stream = new SseStream(request);
  return stream.pipe(response);
}
Enter fullscreen mode Exit fullscreen mode

Note that we have replaced the @Sse decorator with the @Get decorator. Now we have full control over the request and response, making it possible to reject incoming requests and close connection with client if necessary.

SseStream class is just a transformational stream that will handle all headers required for SSE and will transform data you write in it into valid SSE message.

Now lets see how we can rewrite example using EventEmitter.

private readonly eventStream = new EventEmitter()

@Post()
actionA() {
  this.eventStream.emit('new-message', 'Hello')
}

@Get('sse')
sse(@Req() request, @Res() response) {
  const sseStream = new SseStream(request)
  this.eventStream.on('new-message', onNewMessage);
  response.on('close', () => {
    this.eventStream.off('new-message', onNewMessage)
  });
  return sseStream.pipe(response)

  function onNewMessage(data) {
    sseStream.write({type: 'new-message', data})
  }
}
Enter fullscreen mode Exit fullscreen mode

Pay attention to the response.on('close') line, which is important for freeing up resources when the connection is closed. Don't worry if you encounter a MaxListenersExceededWarning: Possible EventEmitter memory leak detected warning.

Conclusion

NestJS simplifies the integration of SSE into your applications, offering both built-in support and manual implementation.

Top comments (0)