DEV Community

Mat
Mat

Posted on • Edited on • Originally published at cronusmonitoring.com

Handling socket connections across functions in Go

This guide shows how to manage socket connections across different functions using the socketmanager package.

This is a problem that i needed to solve.

Say you have a website and you want to measure the current active users. On the client side its pretty simple, every time the website is loaded, establish a web socket connection to the server.

Sample client side code

function App() {
  useEffect(() => {
    let ws = "wss";

    if (window.location.protocol === "http:") ws = "ws";
    const socket = new WebSocket(
      `${ws}://${window.location.host}/active-users`
    );

    return () => {
      socket.close();
    };
  }, []);
Enter fullscreen mode Exit fullscreen mode

Then on the server side you would have a handler for when the client connects to the server.

Sample server side code

Creating the handler

func WSHandler(w http.ResponseWriter, r *http.Request, upgrader websocket.Upgrader) {

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println(err.Error())
        return
    }

    defer conn.Close()
    for {
        _, _, err := conn.ReadMessage()
        if err != nil {
            break
        }

        time.Sleep(time.Second)
    }
}
Enter fullscreen mode Exit fullscreen mode

Calling the handler for /active-users

    mux.HandleFunc("/active-users", func(w http.ResponseWriter, r *http.Request) {
        WSHandler(w, r, upgrader)
    })
Enter fullscreen mode Exit fullscreen mode

This is great but theres one problem, You cant break out of the function to measure the active users. Each function WSHandler running is not aware of other WSHandler functions running.

This is where the package i created comes in to save the day! Heres how it works.

  1. Install the package
go get -u github.com/mperkins808/socketmanager/go/pkg/socketmanager
Enter fullscreen mode Exit fullscreen mode
  1. Establish a socketmanager in your main function.
// creating the socket manager
sm := socketmanager.NewSimpleSocketManager()
// optionally create it and add to context instead 
ctx := socketmanager.NewSimpleSocketManager().WithContext(context.Background())

Enter fullscreen mode Exit fullscreen mode
  1. Now we can parse socketmanager as either a direct argument or through context through to our handlers
// directly 
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        WSHandler(w, r, sm, upgrader)
    })

// through context 
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        WSHandler(w, r.WithContext(ctx), upgrader)
    })

Enter fullscreen mode Exit fullscreen mode
  1. Now lets update our websocket handler so that when there is a new connection this is added to our list of active sockets.

Here everytime a new user connects on the client. Our list of active sockets is increased.

func WSHandler(w http.ResponseWriter, r *http.Request, upgrader websocket.Upgrader) {

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        fmt.Println(err.Error())
        return
    }
// get socket manager from context
    sm, err := socketmanager.GetSocketManagerFromContext(r.Context())
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    sID := uuid.New().String()
    sm.Add(sID, sID)

    defer sm.Remove(sID)
    defer conn.Close()
    for {
        _, _, err := conn.ReadMessage()
        if err != nil {
            break
        }

        time.Sleep(time.Second)
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. Finally. Lets create a function that runs in the background and logs the number of active users
    go func(ctx context.Context) {
        for {
            sm, err := socketmanager.GetSocketManagerFromContext(ctx)
            if err != nil {
                // handle err
            } else {
                fmt.Printf("active users %v\n", len(sm.GetActiveSockets()))
            }
            time.Sleep(time.Second)
        }
    }(ctx)
Enter fullscreen mode Exit fullscreen mode

The end result

I just used the npm package wscat to act as the client for this tutorial. Then as each client connected, the number of active users increased.

Connecting websocket clients to server

Real use cases

Logging out the active users is nice but its not the reason i created this package. Its main use case is for instructing the client when to fetch more data. Say you have an endpoint that fetches some data, this data can update at any time and you would like the client to fetch the latest data when this occurs.

  1. First step is to establish a websocket connection. Thats what the first code block in this tutorial is doing, but we'll expand it to handle an update message. So when the page loads we'll call handleFetchData(), and then if the update message is received we'll call handleFetchData() again
  useEffect(() => {

    handleFetchData();
    let ws = "wss";
    if (window.location.protocol === "http:") ws = "ws";

    const socket = new WebSocket(
      `${ws}://${window.location.host}/device-ws?id=${props.currentUser.uid}`
    );

    socket.addEventListener("message", (event) => {
      if (event.data === "update") {
        handleFetchData();
      }
    });
    return () => {
      socket.close();
    };
  }, [props.currentUser]);
Enter fullscreen mode Exit fullscreen mode
  1. Now lets update the websocket handler to check if an update is due, if it is then send the update message to the client.

This updated function is doing a few things.

  • getting the userid (id) from the query
  • adding that userid to the socket manager
  • if an update is due, then it sends the update message to that user client
func WSDeviceHandler(w http.ResponseWriter, r *http.Request, upgrader websocket.Upgrader) {
    id := r.URL.Query().Get("id")
    if id == "" {
        return
    }

    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Error(err)
        return
    }
        sm, err := socketmanager.GetSocketManagerFromContext(ctx)
        if err != nil {
       // handle err
    } 
    sID := uuid.New().String()

    sm.Add(id, sID)
    defer sm.Remove(id)
    defer conn.Close()
    for {
        if err != nil {
            return
        }
        if sm.UpdateDue(id) {
            log.Info("update due for ", id)
            sm.SetUpdateDue(id, false)

            err = conn.WriteMessage(1, []byte("update"))
            if err != nil {
                return
            }

        }
        time.Sleep(time.Second)
    }
}
Enter fullscreen mode Exit fullscreen mode
  1. Finally we have another function that determines if an update is due for a user
func DataHandler(w http.ResponseWriter, r *http.Request) {
   ...
   // above work determining if an update is due
   // socketmanager retrieved from context like prior examples 
   sm.SetUpdateDue(id, true)
   ...
Enter fullscreen mode Exit fullscreen mode

What did we achieve

  • Ability to manage active sockets across many functions
  • Extract state out of a websocket or http handler
  • Enable a more reactive client

Real example of how i use the package

I created a IOS app Cronus that brings Prometheus and Google Cloud Monitoring to mobiles. You can view your metrics and alerts wherever you are. The way i have set up the app is that IOS devices need to be added by scanning a QR code on cronusmonitoring.com. Every time a device is scanned and added. The page showing the active devices should automatically updated.

Cronus adding a device

On the backend i have websocket that is measuring the active users on the /devices page. Then when a device is successfully onboarded (through a separate handler), socketmanager tells the websocket that that particular socket needs to send an update message. Finally on the user's page the client pulls their devices data when told to update.

Top comments (0)