DEV Community

BookOfCooks
BookOfCooks

Posted on

Reconnecting Websockets with RxDart

Quick Background Context

Prayershub is an app that allows users to create playlists (called Worships) that contain songs, psalms, and prayers. A user can "Start" a Worship, which automatically plays through playlist's contents one-by-one.

In addition, users can "Broadcast" a Worship, which starts a 5-minute countdown, during which, other users can "Join" the worship. Upon the countdown's completion, the playlist starts simultaneously across all devices joined in.

The devices sync with the master controller (owner of the worship), who can go next/back/pause/resume or abort.

The state management of WorshipPage comprises entirely of RxDart and StreamBuilders, along with WebSockets as the backend technology for "Broadcast".

The Current State

The WorshipPage has a "socket" field of type Stream<WebSocket> that replays the latest stream.

class _WorshipPageState extends State<WorshipPage> {
  // ...
  late ReplayStream<WebSocket> socket = WebSocket.connect(
    "$websocketPath/live-worship/${widget.worship.id}",
    headers: {"accesstoken": mainAccessToken},
  ).asStream().shareReplay(maxSize: 1);
  // ...
}
Enter fullscreen mode Exit fullscreen mode

Apparently, socket was made a Stream<WebSocket> to allow it to compose with other streams that would in turn be composed into more streams.

class _WorshipPageState extends State<WorshipPage> {
  // ...
  late ReplayStream<WebSocket> socket = WebSocket.connect(
    "$websocketPath/live-worship/${widget.worship.id}",
    headers: {"accesstoken": mainAccessToken},
  ).asStream().shareReplay(maxSize: 1);

  late var socketMessages = socket.switchMap((s) => s.map((m) => m.toString())).share();

  late var countdownEndedSocketMessage = socketMessages.where((p) => p == "countdown_ended").map((_) => true).share();
  late var abortSocketMessage = socketMessages.where((p) => p == "abort").map((_) => true).share();
  // ...
}
Enter fullscreen mode Exit fullscreen mode

I personally believe there's better way to manage state than RxDart, and even with RxDart this could've been done better. However, it isn't all to bad and implementing reconnecting sockets shouldn't be too difficult.

Implementation

socketMessages listens to socket and maps the socket to a Stream<string> messages with switchMap. The switchMap operator means that when a new socket is passed through the socket stream, socketMessages will stop listening to the old WebSocket and will listen to the new WebSocket.

That's perfect!

All we need to do is have the socket stream emit a new WebSocket every time the old WebSocket disconnects.

class _WorshipPageState extends State<WorshipPage> {
  late ReplayStream<WebSocket> socket =
      createReconnectingSocket().shareReplay(maxSize: 1);

  Stream<WebSocket> createReconnectingSocket() async* {
    while (true) {
      var socket = await WebSocket.connect(
        "$websocketPath/live-worship/${widget.worship.id}",
        headers: {"accesstoken": mainAccessToken},
      );

      yield socket;

      await socket.onDisconnected;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Only one issue, onDisconnected is not a real property of WebSocket. Fortunately, doOnDone is, and we can change our code to:

  Stream<WebSocket> createReconnectingSocket() async* {
    while (true) {
      var socket = await WebSocket.connect(
        "$websocketPath/live-worship/${widget.worship.id}",
        headers: {"accesstoken": mainAccessToken},
      );

      yield socket;

      await socket.doOnDone(() {}).first;
    }
  }
Enter fullscreen mode Exit fullscreen mode

However, starting up the app immediately panics with Bad state: Stream has already been listened to. Logs reveal this line as the problem: await socket.doOnDone(() {}).first;

Because WebSocket implements Stream<dynamic>, we can call listen and doOnData methods on it. Therefore, the steps to the exception can be modeled as followed.

  1. We yield socket in createReconnectingSocket
  2. The socket stream emits a WebSocket
  3. socketMessages emits a Stream mapping over WebSocket, broadcasting the result
  4. Multiple StreamBuilder(s) listen to socketMessages
  5. doOnDone() listens to the WebSocket (already listened to by step #4 through a broadcast), and crashes the process

To get around this, we can broadcast the WebSocket, and then emit that broadcasted result, as well as wait for broadcast stream to end (which signals the WebSocket has closed/terminated.

  Stream<WebSocket> createReconnectingSocket() async* {
    while (true) {
      var socket = await WebSocket.connect(
        "$websocketPath/live-worship/${widget.worship.id}",
        headers: {"accesstoken": mainAccessToken},
      );

      // ↓↓↓↓↓
      var broadcast = socket.asBroadcastStream();

      yield broadcast;

      await broadcast.doOnDone((){}).first;
    }
  }
Enter fullscreen mode Exit fullscreen mode

However, the Dart LSP complains:

A yielded value of type 'Stream<dynamic>' must be assignable to 'WebSocket'
Enter fullscreen mode Exit fullscreen mode

Except the stream returned by asBroadcastStream cannot be casted to a WebSocket, simply because WebSocket is more than just a Stream.

But wait! We've practically achieved our objective haven't we? Sure we're not re-emitting WebSocket, but we are re-emitting Streams of messages Stream<String>, which is all the consumers need.

We can change createReconnectingSocket signature to:

// Stream<WebSocket> createReconnectingSocket() async* {
Stream<Stream<String>> createReconnectingSocket() async* {
Enter fullscreen mode Exit fullscreen mode

Now the component looks like this:

class  _WorshipPageState  extends  State<WorshipPage> {
  late ReplayStream<Stream<String>> socket =
      createReconnectingSocket().shareReplay(maxSize: 1);

  late var socketMessages = socket.switchMap((messages$) => messages$).share();

  Stream<Stream<String>> createReconnectingSocket() async* {
    while (true) {
      var socket = await WebSocket.connect(
        "$websocketPath/live-worship/${widget.worship.id}",
        headers: {"accesstoken": mainAccessToken},
      );

      var broadcast = socket.asBroadcastStream().map((s) => s.toString());

      yield broadcast;

      await broadcast.doOnDone(() {}).first;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

At this point, all errors disappear, except for one. At the very bottom of the file:

 @override
 void dispose() {
   socket.listen((ws) => ws.close()); // fails to build
   WakelockPlus.disable();
   for (var sub in subs) {
     sub.cancel();
   }
   audio.dispose();
   soloAudio.dispose();
   super.dispose();
 }
Enter fullscreen mode Exit fullscreen mode

Originally, listening to Stream<WebSocket> socket would immediately emit a WebSocket that dispose would close. However, now that socket is a Stream<Stream<String>>, we lose that functionality.

What we could do is have createReconnectingSocket return both the socket and stream. Alternatively, createReconnectingSocket can accept a close stream to check if it receives the close message before the socket closes.

If close emits an item first, then we close the WebSocket and return. Now dispose just needs to send an event to close (of type Subject<void>).

Possible Improvements

You can very neatly reconnect WebSockets using Dart's async generators and RxDart. However, there are others ways of going about it depending on want you want.

For example, if you'd like the user to know if the WebSocket is connecting, connected, or failed, you might create a Stream<Future<WebSocket>> websocket along with publishReplay().

Each this Stream<Future<WebSocket>> emits a Future<WebSocket>, that represents a new attempt to connect a WebSocket.

When that emitted Future completes successfully, that represents a connected WebSocket. If the future fails, that represents a failed connection.

If you append publishReplay to this Stream<Future<WebSocket>> , thenconnect it in initState, late subscribers to this stream will get the latest Future<WebSocket>.

If that Future<WebSocket> has already completed, then the late subscribers immediately receives the WebSocket, which will be the latest established WebSocket.

Need Help?

I work with RxDart and Flutter on a daily basis, so I'm willing to help. You can either put your requests in the comments or contact me at bookofcooks123@gmail.com.

I also work on backends in JavaScript, PHP, or Golang, so I'm open to questions on working with WebSockets in those areas if needed.

Top comments (0)