DEV Community

Masui Masanori
Masui Masanori

Posted on

[Go] Try Server-Sent Events

#go

Intro

I've tried WebSockets before, but some networks are blocking the WebSocket protocol, so I need another way instead.
According to the SignalR documents, it use Server-Sent Events(SSE), Ajax Long Polling, etc.

This time, I will try SSE.

I will keep using the project.

Using SSE with multiple clients

Most features of SSE are defined on the client side.

According to the PHP sample, the most important thing to use SSE on the server side is set "Content-Type: text/event-stream" to the response header.

After setting the response header, the client keeps connecting until closing the connection.

Because SSE only can push data from the server side, so I send data from the client side by POST requests.

To broadcast data to multiple clients, I wrote the program this time with reference to the sample of gorilla / websocket.

Sample project

sse.html

<!DOCTYPE html>
<html>
    <head>
        <title>Server-Sent Events Sample</title>
        <meta charset="utf-8">
        <link href="css/site.css" rel="stylesheet" />
    </head>
    <body>
        <input type="text" id="user_name" placeholder="User Name">
        <div id="sample_message"></div>
        <textarea id="input_message"></textarea>
        <button onclick="Page.connect()">Connect</button>
        <button onclick="Page.send()">Send</button>
        <button onclick="Page.close()">Close</button>
        <div id="received_text_area"></div>
        <script src="/js/sse.page.js"></script>
    </body>
</html>
Enter fullscreen mode Exit fullscreen mode

sse.page.ts

let es: EventSource;
let userName = ""
export function connect(): void {
    const userNameInput = document.getElementById("user_name") as HTMLInputElement;
    userName = userNameInput.value;
    const receivedArea = document.getElementById("received_text_area") as HTMLElement;
    es = new EventSource(`http://localhost:8080/sse?user=${userName}`);
    es.onopen = (ev) => {
        console.log(ev);
    };
    es.onmessage = ev => {
        const newText = document.createElement("div")
        newText.textContent = ev.data;
        receivedArea.appendChild(newText);
    };
    es.onerror = ev => {
        console.error(ev);        
    };
}
export function send() {
    const messageInput = document.getElementById("input_message") as HTMLTextAreaElement;
    fetch("http://localhost:8080/sse/message", 
        {
            method: "POST",
            body: JSON.stringify({ message: messageInput.value, user: userName }),
        })
        .then(res => res.json())
        .then(json => console.log(json))
        .catch(err => console.error(err));
}
export function close() {    
    es.close();
}
Enter fullscreen mode Exit fullscreen mode

main.go

...
func main() {
    hub := *newSseHub()
    go hub.run()
...
    http.HandleFunc("/sse/message", func(w http.ResponseWriter, r *http.Request) {
        sendSseMessage(w, r, &hub)
    })
    http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {
        registerSseClient(w, r, &hub)
    })
    http.Handle("/pages/sse", &templateHandler{filename: "sse.html", serverUrl: "http://localhost:8080/sse"})
...
    log.Fatal(http.ListenAndServe("localhost:8080", nil))
}
Enter fullscreen mode Exit fullscreen mode

sseHub.go

package main

type SseHub struct {
    // registered clients
    clients    map[*SseClient]bool
    broadcast  chan string
    register   chan *SseClient
    unregister chan *SseClient
}

func newSseHub() *SseHub {
    return &SseHub{
        clients:    make(map[*SseClient]bool),
        broadcast:  make(chan string),
        register:   make(chan *SseClient),
        unregister: make(chan *SseClient),
    }
}
func (h *SseHub) run() {
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                close(client.messageChan)
                delete(h.clients, client)
            }
        case message := <-h.broadcast:
            for client := range h.clients {
                select {
                case client.messageChan <- message:
                default:
                    close(client.messageChan)
                    delete(h.clients, client)
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

sseClient.go

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
)

type SseClient struct {
    messageChan chan string
    userName    string
}
type SseMessage struct {
    Message string `json:"message"`
    User    string `json:"user"`
}

func registerSseClient(w http.ResponseWriter, r *http.Request, hub *SseHub) {
    userName, err := getParam(r, "user")
    if err != nil {
        log.Println(err.Error())
        fmt.Fprint(w, "The parameters have no names")
        return
    }
    // For pushing data to clients, I call "flusher.Flush()"
    flusher, _ := w.(http.Flusher)

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    newClient := SseClient{messageChan: make(chan string), userName: userName}
    hub.register <- &newClient

    defer func() {
        hub.unregister <- &newClient
    }()
    for {
        select {
        case message := <-newClient.messageChan:
            // push received messages to clients
            fmt.Fprintf(w, "data: %s", message)
            flusher.Flush()
        case <-r.Context().Done():
            // when "es.close()" is called, this loop operation will be ended.
            return
        }
    }
}
func sendSseMessage(w http.ResponseWriter, r *http.Request, hub *SseHub) {
    returnValue := &sampleResult{}
    w.Header().Set("Content-Type", "application/json")
    body, readBodyError := ioutil.ReadAll(r.Body)
    if readBodyError != nil {
        log.Println(readBodyError.Error())
        returnValue.Succeeded = false
        returnValue.ErrorMessage = "Failed reading values from body"
        failedReadingData, _ := json.Marshal(returnValue)
        w.Write(failedReadingData)
        return
    }
    message := &SseMessage{}
    convertError := json.Unmarshal(body, &message)
    if convertError != nil {
        log.Println(convertError.Error())
        returnValue.Succeeded = false
        returnValue.ErrorMessage = "Failed converting to WebSocketMessage"
        failedConvertingData, _ := json.Marshal(returnValue)
        w.Write(failedConvertingData)
        return
    }
    w.WriteHeader(200)
    hub.broadcast <- message.Message

    returnValue.Message = "OK"
    returnValue.Succeeded = true
    data, _ := json.Marshal(returnValue)
    w.Write(data)
}
Enter fullscreen mode Exit fullscreen mode

Resources

Latest comments (0)