<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Egemen Göl</title>
    <description>The latest articles on DEV Community by Egemen Göl (@egemengol).</description>
    <link>https://dev.to/egemengol</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F779634%2F64441403-c1c6-4c52-a295-5f4e9f4185c9.jpeg</url>
      <title>DEV Community: Egemen Göl</title>
      <link>https://dev.to/egemengol</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/egemengol"/>
    <language>en</language>
    <item>
      <title>Building a WebSocket Chatroom using Golang and Spread the PubSub Library</title>
      <dc:creator>Egemen Göl</dc:creator>
      <pubDate>Thu, 07 Mar 2024 13:15:06 +0000</pubDate>
      <link>https://dev.to/egemengol/building-a-websocket-chatroom-using-golang-and-spread-the-pubsub-library-2n03</link>
      <guid>https://dev.to/egemengol/building-a-websocket-chatroom-using-golang-and-spread-the-pubsub-library-2n03</guid>
      <description>&lt;p&gt;We are trying to build a websocket based chatroom.&lt;/p&gt;

&lt;p&gt;There are two excellent examples of websocket based chatrooms already, let's go over them.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://github.com/nhooyr/websocket/blob/master/internal/examples/chat/chat.go"&gt;nhooyr/websocket example&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The server struct holds a set of subscriber structs, each consisting of a &lt;code&gt;chan []byte&lt;/code&gt; while knowing how to close themselves.&lt;br&gt;&lt;br&gt;
The server manages access to this set via a mutex and helper functions to keep it current for all of the subscribers coming and going.&lt;/p&gt;



&lt;p&gt;&lt;a href="https://github.com/gorilla/websocket/blob/main/examples/chat/hub.go"&gt;gorilla/websocket example&lt;/a&gt;  &lt;/p&gt;

&lt;p&gt;There is a &lt;code&gt;Hub&lt;/code&gt; structure that keeps a set of &lt;code&gt;Client&lt;/code&gt; structs which point to the &lt;code&gt;Hub&lt;/code&gt; and a channel, and manages access to them via exposed channels. &lt;br&gt;
The clients, however, send pointers of themselves over these channels whenever they want to come and go.&lt;br&gt;&lt;br&gt;
The &lt;code&gt;Hub&lt;/code&gt; and &lt;code&gt;Client&lt;/code&gt; structures are highly coupled, and the logic is &lt;em&gt;spread&lt;/em&gt; (pun intended) over two files.&lt;/p&gt;


&lt;h4&gt;
  
  
  Our Plan
&lt;/h4&gt;

&lt;p&gt;We will use:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://github.com/nhooyr/websocket"&gt;nhooyr/websocket&lt;/a&gt; for handling websockets.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/egemengol/spread"&gt;egemengol/spread&lt;/a&gt; for handling the PubSub between multiple websocket connections, with clean and obvious code.&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Message Struct
&lt;/h3&gt;

&lt;p&gt;Let's create our &lt;code&gt;Message&lt;/code&gt; type. &lt;/p&gt;

&lt;p&gt;This struct will be the "message" type of the PubSub topic later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Username&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="s"&gt;`json:"name"`&lt;/span&gt;
    &lt;span class="n"&gt;Message&lt;/span&gt;  &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="s"&gt;`json:"msg"`&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Publish HTTP Handler
&lt;/h3&gt;

&lt;p&gt;Creating a &lt;code&gt;http.Handler&lt;/code&gt; for publishing messages to the PubSub topic is easy.&lt;/p&gt;

&lt;p&gt;The clients will use it by sending &lt;code&gt;HTTP POST&lt;/code&gt; requests to the &lt;code&gt;/publish&lt;/code&gt; endpoint&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;We read the incoming websocket message body and parse it into our &lt;code&gt;Message&lt;/code&gt; struct.&lt;/li&gt;
&lt;li&gt;We call the &lt;code&gt;spread.Topic.Publish&lt;/code&gt; method with our &lt;code&gt;Message&lt;/code&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;When we call this function with its required dependencies (logger and topic), we will obtain a &lt;code&gt;http.Handler&lt;/code&gt;. We will pass that to &lt;code&gt;http.ServeMux&lt;/code&gt; for serving, later.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;HandlePublish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;spread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Handler&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HandlerFunc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ResponseWriter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Request&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewDecoder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Warn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error decoding message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StatusBadRequest&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"publishing message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"msg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error publishing message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StatusInternalServerError&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WriteHeader&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StatusNoContent&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see from the accepted &lt;code&gt;topic&lt;/code&gt; argument, the topic knows and restricts the message types it broadcasts. Only &lt;code&gt;Message&lt;/code&gt; structs are allowed into this particular topic.&lt;/p&gt;

&lt;p&gt;Also, the handler does not know or care anything about its subscribers, it blissfully fires and forgets.&lt;/p&gt;

&lt;h3&gt;
  
  
  Subscribe Websocket Handler
&lt;/h3&gt;

&lt;p&gt;The clients will connect to this endpoint by making a connection to &lt;code&gt;ws://localhost:8000/subscribe&lt;/code&gt; endpoint with their WebSocket library. &lt;/p&gt;

&lt;p&gt;Our chatroom implementation chooses to use this websocket connection only for sending messages from the server to the client, even though it could potentially use it in both ways. Makes the implementation and error handling easier. Also, the writer of our websocket library has chosen to implement the chat functionality in this way.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;We will upgrade the incoming request to a WebSocket connection by calling &lt;code&gt;Accept&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;We will subscribe to the topic, by getting a &lt;code&gt;&amp;lt;-chan Message&lt;/code&gt; from it.&lt;/li&gt;
&lt;li&gt;We loop through the messages and write them to the client.
We keep in mind that at any point, client can disconnect, or topic can be cancelled (on shutdown). We handle these cases in our loop.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;HandleSubscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;spread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Handler&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HandlerFunc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ResponseWriter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Request&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;websocket&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Accept&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;w&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error accepting websocket"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CloseNow&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="n"&gt;recvChan&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;removeRecvChan&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRecvChannel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;20&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error getting recv channel"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;removeRecvChan&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CloseRead&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Done&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"client disconnected"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt;
            &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;recvChan&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;ok&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"recv channel closed"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;websocket&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StatusGoingAway&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error marshaling message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Write&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;websocket&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MessageText&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Warn&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error writing message"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="k"&gt;return&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="s"&gt;"forwarded to listener"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                    &lt;span class="s"&gt;"fromUser"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Username&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                    &lt;span class="s"&gt;"msg"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;
                &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;})&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we subscribe to the topic by requesting a read channel, a copy of all of the messages published to the topic start to getting sent to the newly created channel. We are responsible for closing it via the returned destructor function.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;code&gt;ctx := conn.CloseRead(r.Context())&lt;/code&gt;&lt;br&gt;&lt;br&gt;
Our websocket library detects the disconnected clients, only when it tries to read or write.&lt;br&gt;&lt;br&gt;
Since we know we do not read anything from client, we use this helper function to create a context, which gets cancelled when the client disconnects.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Construct http.Server
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;We create a logger to be used by both our handlers and our topic.&lt;/li&gt;
&lt;li&gt;We create a topic. It needs:

&lt;ul&gt;
&lt;li&gt;The type of the message it will carry, using generics.&lt;/li&gt;
&lt;li&gt;A context, for easy cancellation&lt;/li&gt;
&lt;li&gt;An optional logger, mainly for debug purposes.&lt;/li&gt;
&lt;li&gt;A channel size for the publishers, they will block if the channel is full.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;We populate the routes, via the handler factories defined above. We give each their dependencies. We then create the http server.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewTextHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Stderr&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HandlerOptions&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;Level&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LevelInfo&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;spread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewTopic&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;](&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="m"&gt;20&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;mux&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewServeMux&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;mux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"POST /publish"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HandlePublish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="n"&gt;mux&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"/subscribe"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;HandleSubscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

    &lt;span class="n"&gt;httpServer&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;http&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Server&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Addr&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="s"&gt;"localhost:8000"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Handler&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;mux&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;ReadTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;  &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;WriteTimeout&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="c"&gt;// &lt;/span&gt;
    &lt;span class="c"&gt;// We will run the server here&lt;/span&gt;
    &lt;span class="c"&gt;//&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Background&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fatal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Shutdown
&lt;/h3&gt;

&lt;p&gt;Graceful shutdown is the hardest part of a http application in my opinion, especially a websocket enabled one.&lt;/p&gt;

&lt;p&gt;We will implement an optimist but brutal shutdown mechanism here.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Get a context that gets cancelled when an interrupt is received.&lt;/li&gt;
&lt;li&gt;Spawn a goroutine that waits on that context. 

&lt;ol&gt;
&lt;li&gt;Wait for the shutdown of websocket handlers, which &lt;code&gt;Shutdown&lt;/code&gt; does not close.&lt;/li&gt;
&lt;li&gt;Shutdown the http server. The timeout is for the regular connections to close, in this case serving an &lt;code&gt;index.html&lt;/code&gt;.&lt;/li&gt;
&lt;/ol&gt;


&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;About the wait of the handlers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The topic is aware of the outer context, which gets cancelled on an interrupt.
&lt;/li&gt;
&lt;li&gt;The topic notifies any channel based subscribers by closing their receive channels.
&lt;/li&gt;
&lt;li&gt;The handlers return when they see their recv channels close.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;Run&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c"&gt;// Connect the context to interrupts.&lt;/span&gt;
    &lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancel&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;signal&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NotifyContext&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Interrupt&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;cancel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c"&gt;//&lt;/span&gt;
    &lt;span class="c"&gt;// Rest of the Run function&lt;/span&gt;
    &lt;span class="c"&gt;//&lt;/span&gt;

    &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c"&gt;// Wait for the context be notified of an interrupt&lt;/span&gt;
        &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Done&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="c"&gt;// Topic is already listening to the context,&lt;/span&gt;
        &lt;span class="c"&gt;// we know it will send close signals to the handlers&lt;/span&gt;
        &lt;span class="c"&gt;// We wait for them to return for a bit&lt;/span&gt;
        &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;100&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Millisecond&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c"&gt;// Give the server time to close all the connections&lt;/span&gt;
        &lt;span class="n"&gt;timeoutCtx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancel&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WithTimeout&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Background&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="m"&gt;100&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Millisecond&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;cancel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;httpServer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Shutdown&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeoutCtx&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"error closing http server"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"err"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}()&lt;/span&gt;

    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"http server started listening on"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"addr"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;httpServer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Addr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;httpServer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ListenAndServe&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;In principle, we could share a &lt;code&gt;sync.WaitGroup&lt;/code&gt; between subscribe handlers, and wait on it instead of sleeping a fixed amount. The drawback would be waiting while blocking, which would hang on a non-progressing websocket handler, which is not good. We could wrap that blocking wait inside a goroutine and a channel, with a timeout around them. &lt;/p&gt;

&lt;p&gt;It is outside of the scope of this article, which is demonstrating how easy PubSub can be in Golang. The wait here is good enough, the remaining websocket connections are dropped and clients get notified via their websocket libraries anyway.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h3&gt;
  
  
  Conclusion
&lt;/h3&gt;

&lt;p&gt;The application may look complicated, but if we squint hard enough we can see it is mainly boilerplate whenever one writes http servers in golang via the standard library. The &lt;code&gt;/subscribe&lt;/code&gt; part is typical for a listening websocket. &lt;/p&gt;

&lt;p&gt;When we compare the approach here with the official examples, how obvious the code becomes, pushing all the synchronization work to the &lt;a href="https://github.com/egemengol/spread"&gt;egemengol/spread&lt;/a&gt; library.&lt;/p&gt;

&lt;p&gt;You can see the fully working implementation, along with &lt;a href="https://github.com/egemengol/spread/tree/main/examples/chatroom/index.html"&gt;a simple UI&lt;/a&gt; at &lt;a href="https://github.com/egemengol/spread/tree/main/examples/chatroom"&gt;spread examples directory&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Introducing Spread - Ergonomic PubSub/EventBus in Golang</title>
      <dc:creator>Egemen Göl</dc:creator>
      <pubDate>Mon, 26 Feb 2024 21:34:40 +0000</pubDate>
      <link>https://dev.to/egemengol/introducing-spread-ergonomic-pubsubeventbus-in-golang-4jnl</link>
      <guid>https://dev.to/egemengol/introducing-spread-ergonomic-pubsubeventbus-in-golang-4jnl</guid>
      <description>&lt;h3&gt;
  
  
  Why
&lt;/h3&gt;

&lt;p&gt;When one wants pub/sub functionality in a Golang application, there are mainly two choices.&lt;/p&gt;

&lt;p&gt;One: You bring a &lt;strong&gt;&lt;em&gt;Big Boi&lt;/em&gt;&lt;/strong&gt; like &lt;em&gt;RabbitMQ, Kafka, Redis&lt;/em&gt; or some other messaging solution. They are battle-tested and scalable, however they come with some drawbacks:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;u&gt;Development complexity&lt;/u&gt;:
Every developer needs to know the sdk, semantics and performance characteristics of given message broker.&lt;/li&gt;
&lt;li&gt;
&lt;u&gt;Deployment complexity&lt;/u&gt;:
If you are hosting the broker yourself, there is another service you need to manage and keep track of, as well as creating a repeatable development environment in your system for you and your colleagues.&lt;/li&gt;
&lt;li&gt;
&lt;u&gt;Expensive&lt;/u&gt;:
In terms of cloud costs or energy consumption, it is quite a bit heavier than your standalone Golang application.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Two: You implement a custom solution, like the chatroom examples of &lt;a href="https://github.com/gorilla/websocket/blob/main/examples/chat/hub.go"&gt;gorilla/websocket&lt;/a&gt; or &lt;a href="https://github.com/nhooyr/websocket/blob/master/internal/examples/chat/chat.go"&gt;nhooyr/websocket&lt;/a&gt;. It works well, you use no external dependencies in typical Gopher fashion, but:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;u&gt;Error prone&lt;/u&gt;:
It is notoriously easy to miss subtle bugs in concurrent code. We use Golang which makes things so much easier, it is still a problem&lt;/li&gt;
&lt;li&gt;
&lt;u&gt;Tight coupling&lt;/u&gt;:
If you are using channels, opening and closing channels in a correct manner is hard (&lt;code&gt;malloc&lt;/code&gt; and &lt;code&gt;free&lt;/code&gt; vibes anyone?) and requires synchronization between producers and consumers.
It makes your application hard to modify as well, since you never know if you introduced a new bug, too subtle to catch, right away.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;There are a lot of problems that live in between. I will go out of a limb and will claim that %90 of startup ideas and side projects will never grow out of a singular beefy multicore server that hosts a well-written Golang monolith. In the case you do, you have wonderful problems, have at thee with the &lt;strong&gt;&lt;em&gt;Big Bois&lt;/em&gt;&lt;/strong&gt;, but only then.&lt;/p&gt;

&lt;p&gt;There is the really nice &lt;a href="https://github.com/asaskevich/EventBus"&gt;asaskevich/EventBus&lt;/a&gt; library which covers a lot, and is the main inspiration of this library.&lt;br&gt;&lt;br&gt;
A couple minor notes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Unmaintained in the last 4 years.&lt;/li&gt;
&lt;li&gt;Uses mutexes, hard to grasp what is going on.&lt;/li&gt;
&lt;li&gt;Precedes generics, interfaces are clunky to work with.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;So &lt;a href="https://github.com/egemengol/spread"&gt;Spread&lt;/a&gt;, this library here, is a&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Single process and in-memory,&lt;/li&gt;
&lt;li&gt;PubSub / EventBus / Broadcast / Fanout
library with&lt;/li&gt;
&lt;li&gt;Ergonomic, type-safe &lt;em&gt;topics&lt;/em&gt; implemented using generics,&lt;/li&gt;
&lt;li&gt;Based on channels.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Intended to be useful for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In-memory pipelines with persistence &lt;em&gt;==&amp;gt; Event Sourcing&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;Decoupled aggregates for said pipelines&lt;/li&gt;
&lt;li&gt;Soft-realtime applications that needs broadcast, like chatrooms.&lt;/li&gt;
&lt;/ul&gt;
&lt;h3&gt;
  
  
  Overview
&lt;/h3&gt;

&lt;p&gt;You can subscribe to a &lt;code&gt;Topic&lt;/code&gt; in three ways, &lt;em&gt;simultaneously&lt;/em&gt; and &lt;em&gt;dynamically&lt;/em&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;spread&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="c"&gt;// Channel based&lt;/span&gt;
&lt;span class="n"&gt;recvChan&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRecvChannel&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;bufSize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;recvChan&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="o"&gt;...&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// A separate, freshly spawned goroutine per message&lt;/span&gt;
&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HandleAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c"&gt;// Synchronous but blocks the topic&lt;/span&gt;
&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HandleSync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Performance Characteristics
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Every topic has a inbound channel with a dedicated goroutine for broadcasting.&lt;/li&gt;
&lt;li&gt;Synchronous handlers in &lt;code&gt;HandleSync&lt;/code&gt; get executed in this goroutine.&lt;/li&gt;
&lt;li&gt;Asynchronous handlers or receiver channels that cannot keep up (with full buffers) get eliminated from the subscribers.&lt;/li&gt;
&lt;li&gt;Publishing is the same as sending to a buffered channel, blocks when full.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Plans
&lt;/h3&gt;

&lt;p&gt;I love the excellent &lt;a href="https://hexdocs.pm/phoenix_pubsub/Phoenix.PubSub.html"&gt;Phoenix.PubSub&lt;/a&gt; library in the Elixir ecosystem, and would love to achieve the same flavor and balance of usefulness, reliability and flexibility with &lt;strong&gt;Spread&lt;/strong&gt;. I believe we can come close.&lt;/p&gt;

&lt;p&gt;I also am very impressed by the &lt;a href="https://martinfowler.com/articles/lmax.html"&gt;LMAX Architecture&lt;/a&gt;, which prompted me to think about the PubSub pattern.&lt;br&gt;&lt;br&gt;
There are at least two impressive implementations of it in Golang, &lt;a href="https://github.com/smarty-prototypes/go-disruptor"&gt;go-distruptor&lt;/a&gt; and &lt;a href="https://github.com/alphadose/zenq"&gt;zenq&lt;/a&gt;, which I will thoroughly study and will try to join their very performant ideas with an easy-to-use API, in the long run. &lt;/p&gt;

&lt;p&gt;I also want to make it really easy to build &lt;em&gt;Event Sourced&lt;/em&gt; applications, &lt;strong&gt;Spread&lt;/strong&gt; can be a good building block for it.&lt;/p&gt;




&lt;p&gt;Github: &lt;a href="https://github.com/egemengol/spread"&gt;egemengol/spread&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If you think it is worth trying out or talking about, if you have any ideas, lets meet!&lt;/p&gt;

</description>
      <category>pubsub</category>
      <category>programming</category>
      <category>go</category>
      <category>eventbus</category>
    </item>
  </channel>
</rss>
