henge pushes config to edge devices. Semantically, each device should connect once - except:
- What if a device is misconfigured with the details for another device
- What happens if a device disconnected, but we haven’t picked it up yet
- How about if a device has a zombie connection and decides to reconnect
- Any other cases I’d not thought about.
So, we need to figure out how to handle a second connection from the same device.
Allow multiple connections
For each device, we track multiple connections. Instead of a map to eachchannel, we’d have one to an array of chans ([]chan)
Every operation on channel then becomes a loop.
For the case where each correct case is most likely to be one device to one connection, this option has a lot of additional complexity, work and defect surface area.
Reject additional connections
This option is simpler. If a device is already connected, we reject any additional connections.
This option is certainly simpler than allowing multiple connections. However, we are dealing with a network and there are many reasons why a connection does not close properly.
To be able to mitigate some of the issues, we’d have to allow the user to manually clear an open connection so that a device can re-connect.
The endpoint is easy enough to write, but requiring user intervention is not something I like having as part of a product.
We could add a heartbeat, and close the connection down if it’s doesn’t receive a heartbeat for a configured amount of time. This reduces the reliance on user intervention by replacing it with wait time. Of course, it also now adds complexity.
Close existing connection on reconnect
How about on reconnect, we close any existing connections and reconnect to the new request?
Instead of one chan per connection, we now have a Subscription
type Subscription struct {
stream chan henge.ConfigChangedEvent // actual stream of events
closeCmd chan struct{} // on msg, unsubscribe
}
And the connection code involves a little more of a dance:
func (c *ConfigEventBroker) Subscribe(deviceId string) *Subscription {
c.mu.Lock()
defer c.mu.Unlock()
sub, found := c.channels[deviceId]
if !found {
sub = &Subscription{
stream: make(chan henge.ConfigChangedEvent, 1),
closeCmd: make(chan struct{}, 1),
}
c.channels[deviceId] = sub
} else {
// already connected
// close previous connection
sub.closeCmd <- struct{}{}
}
return sub
}
and the connection itself respects the quit command:
for {
select {
case snap := <-sub.stream:
err = sendSSEEvent(w, f, Snapshot{
Version: snap.Version,
Values: snap.Config,
})
if err != nil {
slog.Warn("error while emitting sse event", "err", err)
return
}
case <-sub.closeCmd:
// client reconnected. We can close this one
return
case <-req.Context().Done():
r.broker.Unsubscribe(deviceId)
err = r.devices.SetConnected(deviceId, false)
if err != nil {
slog.Warn("unable to set connect status to disconnected", "err", err)
}
return
}
We don't have to:
- Track multiple connections
- Have Heartbeats
- Require user intervention
In the event of rapid reconnect flurry coinciding with a slow handler, it should
still behave correctly, albeit slower. This rare edge case is accepted. If a
device is reconnecting that quickly to a slow handler, there are probably much
bigger problems at play.
Top comments (0)