Quick Background Context
Prayershub is an app that allows users to create playlists (called Worships) that contain songs, psalms, and prayers. A user can "Start" a Worship, which automatically plays through playlist's contents one-by-one.
In addition, users can "Broadcast" a Worship, which starts a 5-minute countdown, during which, other users can "Join" the worship. Upon the countdown's completion, the playlist starts simultaneously across all devices joined in.
The devices sync with the master controller (owner of the worship), who can go next/back/pause/resume or abort.
The state management of WorshipPage
comprises entirely of RxDart and StreamBuilders, along with WebSockets as the backend technology for "Broadcast".
The Current State
The WorshipPage
has a "socket" field of type Stream<WebSocket>
that replays the latest stream.
class _WorshipPageState extends State<WorshipPage> {
// ...
late ReplayStream<WebSocket> socket = WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
).asStream().shareReplay(maxSize: 1);
// ...
}
Apparently, socket
was made a Stream<WebSocket>
to allow it to compose with other streams that would in turn be composed into more streams.
class _WorshipPageState extends State<WorshipPage> {
// ...
late ReplayStream<WebSocket> socket = WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
).asStream().shareReplay(maxSize: 1);
late var socketMessages = socket.switchMap((s) => s.map((m) => m.toString())).share();
late var countdownEndedSocketMessage = socketMessages.where((p) => p == "countdown_ended").map((_) => true).share();
late var abortSocketMessage = socketMessages.where((p) => p == "abort").map((_) => true).share();
// ...
}
I personally believe there's better way to manage state than RxDart, and even with RxDart this could've been done better. However, it isn't all to bad and implementing reconnecting sockets shouldn't be too difficult.
Implementation
socketMessages
listens to socket
and maps the socket to a Stream<string>
messages with switchMap
. The switchMap
operator means that when a new socket is passed through the socket
stream, socketMessages
will stop listening to the old WebSocket
and will listen to the new WebSocket
.
That's perfect!
All we need to do is have the socket
stream emit a new WebSocket
every time the old WebSocket
disconnects.
class _WorshipPageState extends State<WorshipPage> {
late ReplayStream<WebSocket> socket =
createReconnectingSocket().shareReplay(maxSize: 1);
Stream<WebSocket> createReconnectingSocket() async* {
while (true) {
var socket = await WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
);
yield socket;
await socket.onDisconnected;
}
}
}
Only one issue, onDisconnected
is not a real property of WebSocket
. Fortunately, doOnDone
is, and we can change our code to:
Stream<WebSocket> createReconnectingSocket() async* {
while (true) {
var socket = await WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
);
yield socket;
await socket.doOnDone(() {}).first;
}
}
However, starting up the app immediately panics with Bad state: Stream has already been listened to
. Logs reveal this line as the problem: await socket.doOnDone(() {}).first;
Because WebSocket
implements Stream<dynamic>
, we can call listen
and doOnData
methods on it. Therefore, the steps to the exception can be modeled as followed.
- We yield socket in
createReconnectingSocket
- The
socket
stream emits aWebSocket
-
socketMessages
emits a Stream mapping overWebSocket
, broadcasting the result - Multiple StreamBuilder(s) listen to
socketMessages
-
doOnDone()
listens to theWebSocket
(already listened to by step #4 through a broadcast), and crashes the process
To get around this, we can broadcast the WebSocket
, and then emit that broadcasted result, as well as wait for broadcast stream to end (which signals the WebSocket has closed/terminated.
Stream<WebSocket> createReconnectingSocket() async* {
while (true) {
var socket = await WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
);
// ↓↓↓↓↓
var broadcast = socket.asBroadcastStream();
yield broadcast;
await broadcast.doOnDone((){}).first;
}
}
However, the Dart LSP complains:
A yielded value of type 'Stream<dynamic>' must be assignable to 'WebSocket'
Except the stream returned by asBroadcastStream
cannot be casted to a WebSocket
, simply because WebSocket
is more than just a Stream.
But wait! We've practically achieved our objective haven't we? Sure we're not re-emitting WebSocket
, but we are re-emitting Streams of messages Stream<String>
, which is all the consumers need.
We can change createReconnectingSocket
signature to:
// Stream<WebSocket> createReconnectingSocket() async* {
Stream<Stream<String>> createReconnectingSocket() async* {
Now the component looks like this:
class _WorshipPageState extends State<WorshipPage> {
late ReplayStream<Stream<String>> socket =
createReconnectingSocket().shareReplay(maxSize: 1);
late var socketMessages = socket.switchMap((messages$) => messages$).share();
Stream<Stream<String>> createReconnectingSocket() async* {
while (true) {
var socket = await WebSocket.connect(
"$websocketPath/live-worship/${widget.worship.id}",
headers: {"accesstoken": mainAccessToken},
);
var broadcast = socket.asBroadcastStream().map((s) => s.toString());
yield broadcast;
await broadcast.doOnDone(() {}).first;
}
}
}
At this point, all errors disappear, except for one. At the very bottom of the file:
@override
void dispose() {
socket.listen((ws) => ws.close()); // fails to build
WakelockPlus.disable();
for (var sub in subs) {
sub.cancel();
}
audio.dispose();
soloAudio.dispose();
super.dispose();
}
Originally, listening to Stream<WebSocket> socket
would immediately emit a WebSocket
that dispose
would close
. However, now that socket
is a Stream<Stream<String>>
, we lose that functionality.
What we could do is have createReconnectingSocket
return both the socket and stream. Alternatively, createReconnectingSocket
can accept a close
stream to check if it receives the close message before the socket closes.
If close
emits an item first, then we close the WebSocket and return. Now dispose
just needs to send an event to close
(of type Subject<void>
).
Possible Improvements
You can very neatly reconnect WebSockets using Dart's async generators and RxDart. However, there are others ways of going about it depending on want you want.
For example, if you'd like the user to know if the WebSocket is connecting
, connected
, or failed
, you might create a Stream<Future<WebSocket>> websocket
along with publishReplay()
.
Each this Stream<Future<WebSocket>>
emits a Future<WebSocket>
, that represents a new attempt to connect a WebSocket.
When that emitted Future completes successfully, that represents a connected
WebSocket. If the future fails, that represents a failed
connection.
If you append publishReplay
to this Stream<Future<WebSocket>>
, thenconnect
it in initState
, late subscribers to this stream will get the latest Future<WebSocket>
.
If that Future<WebSocket>
has already completed, then the late subscribers immediately receives the WebSocket
, which will be the latest established WebSocket
.
Need Help?
I work with RxDart and Flutter on a daily basis, so I'm willing to help. You can either put your requests in the comments or contact me at bookofcooks123@gmail.com.
I also work on backends in JavaScript, PHP, or Golang, so I'm open to questions on working with WebSockets in those areas if needed.
Top comments (0)