DEV Community

Daniel Albuschat
Daniel Albuschat

Posted on

Go: Asynchronous Real-Time Broadcasting using Channels and WebSockets

Recently, I had the need to execute a long-running command on a server and send the result to - potentially - multiple clients, in real-time. I naturally chose to use WebSockets as a transport layer. But when implementing the backend in Go, I was looking for a solution to distribute the results that exec.Command spit out to multiple clients in a thread-safe manner. In Go, channels are the tool of choice when sending data asynchronously and thread-safe, but channels communicate between two endpoints only. There's no way to "hook into" a channel from multiple clients. So I needed a different solution. On my search, I learned that you can send channels over channels, which makes channels a much more versatile tool than they already had been for me. Using this, much of the constrains that I thought channels had, have been lifted.

Equipped with this knowledge, you can write stuff like this, which is basically a client/server infrastructure:

serverChan := make(chan chan string, 1)
clientChan := make(chan string, 1)
serverChan <- clientChan
Enter fullscreen mode Exit fullscreen mode

Yes, we sent a channel into a channel! This doesn't do much, indeed. We need to receive the clientChan and send something back, to have a true client/server feeling:

go func(serverChan chan chan string) {
    client := <-serverChan
    client <- "Hello, World!"
}(serverChan)
fmt.Println(<-clientChan)
close(serverChan)
close(clientChan)
Enter fullscreen mode Exit fullscreen mode

Yay, we made it! We sent a channel via a channel, just to send back a string. That in itself might be already breathtaking (right?), but it gets even better: We can use this as a building block to send multiple clients to the server, to make the server "broadcast" stuff to multiple clients, in real-time. We use select for that, so it's even very light on CPU usage.

Let's have a look at the server code:

func server(serverChan chan chan string) {
    var clients []chan string
    for {
        select {
        case client, _ := <-serverChan:
            clients = append(clients, client)
            // Broadcast the number of clients to all clients:
            for _, c := range clients {
                c <- fmt.Sprintf("%d client(s) connected.", len(clients))
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The server just listens endlessly for new clients, adds all of them to a list and sends a text stating the number of total clients to all clients, whenever a new client connects.

This is what the client code can look like:

func client(clientName string, clientChan chan string) {
    for {
        text, _ := <-clientChan
        fmt.Printf("%s: %s\n", clientName, text)
    }
}
Enter fullscreen mode Exit fullscreen mode

And this code glues it all together:

// Start the server:
serverChan:= make(chan chan string, 4)
go server(serverChan)

// Connect the clients:
client1Chan := make(chan string, 4)
client2Chan := make(chan string, 4)
serverChan <- client1Chan
serverChan <- client2Chan

// Notice that we have to start the clients in their own goroutine,// because we would have a deadlock otherwise:
go client("Client 1", client1Chan)
go client("Client 2", client2Chan)

// Just a dirty hack to wait for everything to finish up.
// A clean and safe approach would have been too much boilerplate code
// for this blog-post
time.Sleep(time.Second)
Enter fullscreen mode Exit fullscreen mode

This will output, in some semi-random order:

Client 1: 1 client(s) connected.
Client 1: 2 client(s) connected.
Client 2: 2 client(s) connected.
Enter fullscreen mode Exit fullscreen mode

Voilá! In theory, this should scale "indefinitely", just as indefinitely as everything in computer science does. :-)

Now we can hook this up to our WebSocket server. I chose github.com/gorilla/websocket as my WebSocket library. In this example, the server will just count it's uptime in seconds, and will push this info to all clients. We can choose whether new clients get all output that has happened since the server started, or just the new stuff since the client connected. In my implementation, I chose to send only the new stuff for now.

First off, let's write a slightly modified server that counts it's uptime:

func uptimeServer(serverChan chan chan string) {
    var clients []chan string
    uptimeChan := make(chan int, 1)
    // This goroutine will count our uptime in the background, and write
    // updates to uptimeChan:
    go func (target chan int) {
        i := 0
        for {
            time.Sleep(time.Second)
            i++
            target <- i
        }
    }(uptimeChan)
    // And now we listen to new clients and new uptime messages:
    for {
        select {
        case client, _ := <-serverChan:
            clients = append(clients, client)
        case uptime, _ := <-uptimeChan:
            // Send the uptime to all connected clients:
            for _, c := range clients {
                c <- fmt.Sprintf("%d seconds uptime", uptime)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

This was rather easy. However, if you've read carefully, you will notice that clients are being registered via append(clients, client), but never removed. This will lead to orphaned clients that will cause the server to crash when it tries to write to them. We will leave it at that for the moment, in order to not clutter the code with boilerplate. So beware that your server will crash when you close browser tabs in the following sections.

Next, we write our WebSocket server around our new uptimeServer goroutine:

// This upgrader is needed for WebSocket connections later:
var upgrader = websocket.Upgrader {
    ReadBufferSize: 1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r * http.Request) bool {
        return true // Disable CORS for testing
    },
}

// Start the server and keep track of the channel that it receives
// new clients on:
serverChan := make(chan chan string, 4)
go uptimeServer(serverChan)

// Define a HTTP handler function for the /status endpoint, that can receive
// WebSocket-connections only... so note that browsing it with your browser will fail.
http.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
    // Upgrade this HTTP connection to a WS connection:
    ws, _ := upgrader.Upgrade(w, r, nil)
    // And register a client for this connection with the uptimeServer:
    client := make(chan string, 1)
    serverChan <- client
    // And now check for uptimes written to the client indefinitely.
    // Yes, we are lacking proper error and disconnect checking here, too:
    for {
        select {
        case text, _ := <-client:
            writer, _ := ws.NextWriter(websocket.TextMessage)
            writer.Write([]byte(text))
            writer.Close()
        }
    }
})
http.ListenAndServe(":8080", nil)
Enter fullscreen mode Exit fullscreen mode

Phew, that's a harder nut to crack. As you can read from the inline comments, we do some stuff to set up the WebSocket infrastructure, start our "server" that counts the uptime and then define a HTTP handler function to receive client requests, which in turn will lead to clients registered to our uptime-server, and subsequently write all texts received from the uptime-server to the WebSocket clients.

You can compile and start this, and test it out with this minimalistic JavaScript example:
https://jsfiddle.net/L31wy1gL/13/
(BTW: That's why we disable CORS in the code; to be able to reach the WebSocket server from jsfiddle/codepen/your dummy index.html, etc.)

Minimalistic JavaScript client

That's where I'll end this blog post. You have seen a lot:

• Send channels into channels
• Use this to implement a "client"-registry in a goroutine
• Send broadcast messages to these clients
• Wrap all this into a WebSocket server
Enter fullscreen mode Exit fullscreen mode

But you still have much to do:

• Implement proper error handling
• Implement proper handling for disconnecting clients
• Maybe turn this into a library that you can re-use across your project(s)
Enter fullscreen mode Exit fullscreen mode

I hope you enjoyed this read and would be grateful for suggestions and feedback in the comments.

Top comments (1)

Collapse
 
sirfilip profile image
Filip Kostovski

Great post, i wrote something similar but using different approach and the whole websocket handling logic is around this line github.com/sirfilip/webchat/blob/m... i think that it handles all of the closing websockets logic.