DEV Community

Jones Charles
Jones Charles

Posted on

1

Real-time Updates Made Easy: Building Server-Sent Events with GoFrame πŸš€

Hey there, fellow developers! πŸ‘‹ Ever needed to add real-time updates to your Go application but found WebSockets a bit too complex for your needs? Enter Server-Sent Events (SSE) - a simpler alternative that's perfect for one-way server-to-client communication.

In this guide, I'll walk you through implementing SSE using GoFrame, taking you from basic implementation all the way to production-ready code. Let's dive in!

What are Server-Sent Events? πŸ€”

SSE is a standard that enables servers to push real-time updates to clients over HTTP. Unlike WebSocket, SSE:

  • Is one-way (server to client only)
  • Uses standard HTTP
  • Automatically reconnects if the connection is lost
  • Is simpler to implement

Perfect for: real-time notifications, live feeds, status updates, and monitoring dashboards!

Getting Started: Basic SSE Implementation 🌱

Let's start with a simple example. Here's how to create your first SSE endpoint in GoFrame:

func SseHandler(r *ghttp.Request) {
    // Set SSE headers
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")
    r.Response.Header().Set("Access-Control-Allow-Origin", "*")

    // Create message channel
    messageChan := make(chan string)
    defer close(messageChan)

    // Send updates every 2 seconds
    go func() {
        for {
            message := fmt.Sprintf("Current time: %s", gtime.Now().String())
            messageChan <- message
            time.Sleep(time.Second * 2)
        }
    }()

    // Write to client
    for {
        select {
        case message := <-messageChan:
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", message)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Register the route:

func main() {
    s := g.Server()
    s.BindHandler("/sse", SseHandler)
    s.Run()
}
Enter fullscreen mode Exit fullscreen mode

And here's how to connect from the frontend:

const eventSource = new EventSource('http://localhost:8080/sse');

eventSource.onmessage = function(event) {
    console.log('Got update:', event.data);
};

eventSource.onerror = function(error) {
    console.error('Connection error:', error);
    eventSource.close();
};
Enter fullscreen mode Exit fullscreen mode

Real-World Examples: Let's Build Something Cool! πŸ› οΈ

1. Live Stock Ticker πŸ“ˆ

Let's build something more practical - a real-time stock price feed:

type StockPrice struct {
    Symbol string  `json:"symbol"`
    Price  float64 `json:"price"`
    Time   string  `json:"time"`
}

func StockPriceHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    messageChan := make(chan StockPrice)
    defer close(messageChan)

    // Simulate stock updates
    go func() {
        stocks := []string{"AAPL", "GOOGL", "MSFT"}
        for {
            for _, symbol := range stocks {
                price := StockPrice{
                    Symbol: symbol,
                    Price:  rand.Float64() * 1000,
                    Time:   gtime.Now().String(),
                }
                messageChan <- price
            }
            time.Sleep(time.Second * 3)
        }
    }()

    for {
        select {
        case price := <-messageChan:
            data, err := gjson.Encode(price)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Real-time Chat Room Status πŸ’¬

Monitor active users and typing indicators in a chat room:

type ChatStatus struct {
    ActiveUsers    int      `json:"activeUsers"`
    TypingUsers    []string `json:"typingUsers"`
    LastActivity   string   `json:"lastActivity"`
}

func ChatStatusHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statusChan := make(chan ChatStatus)
    defer close(statusChan)

    // Track room status
    roomStatus := ChatStatus{
        ActiveUsers: 0,
        TypingUsers: make([]string, 0),
    }

    // Handle user status updates (simplified example)
    go func() {
        for {
            // Simulate status changes
            roomStatus.ActiveUsers = rand.Intn(50) + 10
            roomStatus.TypingUsers = []string{"Alice", "Bob"}[0:rand.Intn(2)]
            roomStatus.LastActivity = gtime.Now().String()

            statusChan <- roomStatus
            time.Sleep(time.Second * 2)
        }
    }()

    for {
        select {
        case status := <-statusChan:
            data, err := gjson.Encode(status)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

3. System Monitoring Dashboard πŸ“Š

Monitor system metrics in real-time:

type SystemMetrics struct {
    CPU       float64   `json:"cpu"`
    Memory    float64   `json:"memory"`
    Disk      float64   `json:"disk"`
    Network   NetworkStats `json:"network"`
    Timestamp string    `json:"timestamp"`
}

type NetworkStats struct {
    BytesIn  int64 `json:"bytesIn"`
    BytesOut int64 `json:"bytesOut"`
}

func SystemMetricsHandler(r *ghttp.Request) {
    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    metricsChan := make(chan SystemMetrics)
    defer close(metricsChan)

    // Collect and send metrics
    go func() {
        for {
            metrics := SystemMetrics{
                CPU:     getSystemCPUUsage(),
                Memory:  getSystemMemoryUsage(),
                Disk:    getSystemDiskUsage(),
                Network: getNetworkStats(),
                Timestamp: gtime.Now().String(),
            }

            metricsChan <- metrics
            time.Sleep(time.Second * 5)
        }
    }()

    for {
        select {
        case metrics := <-metricsChan:
            data, err := gjson.Encode(metrics)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
const eventSource = new EventSource('/system/metrics');

eventSource.onmessage = (event) => {
    const metrics = JSON.parse(event.data);

    // Update CPU gauge
    updateGauge('cpu-gauge', metrics.cpu);

    // Update memory usage
    updateProgressBar('memory-bar', metrics.memory);

    // Update network graph
    updateNetworkGraph(metrics.network);

    // Update timestamp
    document.getElementById('last-update').textContent = 
        new Date(metrics.timestamp).toLocaleString();
};
Enter fullscreen mode Exit fullscreen mode

4. Live Order Processing Status ⚑

Track order processing status in real-time:

type OrderStatus struct {
    OrderID    string   `json:"orderId"`
    Status     string   `json:"status"`
    Steps      []Step   `json:"steps"`
    UpdateTime string   `json:"updateTime"`
}

type Step struct {
    Name      string `json:"name"`
    Status    string `json:"status"` // pending, in-progress, completed, failed
    StartTime string `json:"startTime"`
    EndTime   string `json:"endTime"`
}

func OrderStatusHandler(r *ghttp.Request) {
    orderID := r.Get("orderId").String()
    if orderID == "" {
        r.Response.WriteStatus(http.StatusBadRequest)
        return
    }

    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statusChan := make(chan OrderStatus)
    defer close(statusChan)

    // Process order and send updates
    go func() {
        steps := []string{
            "Order Received",
            "Payment Processing",
            "Inventory Check",
            "Packaging",
            "Shipping Label Created",
            "Ready for Pickup"
        }

        orderStatus := OrderStatus{
            OrderID: orderID,
            Status:  "processing",
            Steps:   make([]Step, len(steps)),
        }

        // Simulate order processing
        for i, stepName := range steps {
            orderStatus.Steps[i] = Step{
                Name:      stepName,
                Status:    "in-progress",
                StartTime: gtime.Now().String(),
            }

            statusChan <- orderStatus

            // Simulate processing time
            time.Sleep(time.Second * time.Duration(rand.Intn(3)+1))

            orderStatus.Steps[i].Status = "completed"
            orderStatus.Steps[i].EndTime = gtime.Now().String()

            if i == len(steps)-1 {
                orderStatus.Status = "completed"
            }

            statusChan <- orderStatus
        }
    }()

    for {
        select {
        case status := <-statusChan:
            data, err := gjson.Encode(status)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
// Frontend code for order tracking
const orderId = 'ORDER123'; // Get from your order
const eventSource = new EventSource(\`/order/status?orderId=\${orderId}\`);

eventSource.onmessage = (event) => {
    const status = JSON.parse(event.data);

    // Update overall status
    document.getElementById('order-status').textContent = status.status;

    // Update progress steps
    status.steps.forEach((step, index) => {
        const stepElement = document.getElementById(\`step-\${index}\`);
        stepElement.className = \`step \${step.status}\`;

        if (step.endTime) {
            stepElement.querySelector('.time').textContent = 
                new Date(step.endTime).toLocaleString();
        }
    });

    // Close connection if order is completed
    if (status.status === 'completed') {
        eventSource.close();
    }
};
Enter fullscreen mode Exit fullscreen mode

5. Live Sports Score Updates πŸ†

Track live game scores and statistics:

type GameStats struct {
    GameID      string    `json:"gameId"`
    HomeTeam    TeamStats `json:"homeTeam"`
    AwayTeam    TeamStats `json:"awayTeam"`
    Period      int       `json:"period"`
    TimeLeft    string    `json:"timeLeft"`
    LastUpdate  string    `json:"lastUpdate"`
}

type TeamStats struct {
    Name    string `json:"name"`
    Score   int    `json:"score"`
    Shots   int    `json:"shots"`
    Fouls   int    `json:"fouls"`
    Timeout int    `json:"timeouts"`
}

func LiveGameHandler(r *ghttp.Request) {
    gameID := r.Get("gameId").String()
    if gameID == "" {
        r.Response.WriteStatus(http.StatusBadRequest)
        return
    }

    r.Response.Header().Set("Content-Type", "text/event-stream")
    r.Response.Header().Set("Cache-Control", "no-cache")
    r.Response.Header().Set("Connection", "keep-alive")

    statsChan := make(chan GameStats)
    defer close(statsChan)

    // Simulate live game updates
    go func() {
        gameStats := GameStats{
            GameID:  gameID,
            Period:  1,
            TimeLeft: "12:00",
            HomeTeam: TeamStats{Name: "Home", Score: 0},
            AwayTeam: TeamStats{Name: "Away", Score: 0},
        }

        for {
            // Simulate game events
            if rand.Float32() < 0.3 {  // 30% chance of score change
                if rand.Float32() < 0.5 {
                    gameStats.HomeTeam.Score += 2
                } else {
                    gameStats.AwayTeam.Score += 2
                }
            }

            gameStats.LastUpdate = gtime.Now().String()
            statsChan <- gameStats

            time.Sleep(time.Second * 3)
        }
    }()

    for {
        select {
        case stats := <-statsChan:
            data, err := gjson.Encode(stats)
            if err != nil {
                g.Log().Error(r.Context(), err)
                return
            }
            r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Making it Production-Ready πŸ› οΈ

1. Add a Heartbeat

Keep connections alive with periodic pings:

func sendHeartbeat(r *ghttp.Request) {
    ticker := time.NewTicker(time.Second * 30)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            r.Response.Write([]byte(": heartbeat\n\n"))
            r.Response.Flush()
        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Implement Retry Logic

Make your frontend resilient:

function connectSSE() {
    const eventSource = new EventSource('/sse');

    eventSource.onmessage = (event) => {
        console.log('Got update:', event.data);
    };

    eventSource.onerror = (error) => {
        console.error('Connection lost:', error);
        eventSource.close();

        // Retry after 3 seconds
        setTimeout(connectSSE, 3000);
    };
}
Enter fullscreen mode Exit fullscreen mode

3. Support Different Event Types πŸ”„

Handle various types of updates:

type Message struct {
    Type string      `json:"type"`
    Data interface{} `json:"data"`
    ID   string      `json:"id"`
}

// Send different types of events
response := fmt.Sprintf("id: %s\nevent: %s\ndata: %s\n\n",
    msg.ID, msg.Type, data)
r.Response.Write([]byte(response))
Enter fullscreen mode Exit fullscreen mode

Frontend handling:

const eventSource = new EventSource('/sse');

// Handle specific event types
eventSource.addEventListener('price_update', (e) => {
    const price = JSON.parse(e.data);
    updatePriceDisplay(price);
});

eventSource.addEventListener('alert', (e) => {
    const alert = JSON.parse(e.data);
    showAlert(alert);
});
Enter fullscreen mode Exit fullscreen mode

Scaling with Redis πŸ“ˆ

For distributed systems, use Redis pub/sub to coordinate SSE messages:

type SseManager struct {
    redis *gredis.Redis
    topic string
}

func (sm *SseManager) HandleSSE(r *ghttp.Request) {
    ctx := r.Context()
    pubSub, _, err := sm.redis.Subscribe(ctx, sm.topic)
    if err != nil {
        g.Log().Error(ctx, err)
        return
    }
    defer pubSub.Close(ctx)

    // Forward Redis messages to SSE
    for {
        msg, err := pubSub.ReceiveMessage(ctx)
        if err != nil {
            g.Log().Error(ctx, err)
            return
        }
        r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", msg.Payload)))
        r.Response.Flush()
    }
}
Enter fullscreen mode Exit fullscreen mode

Performance Tips πŸš€

  1. Batch Updates: When you have frequent updates, batch them together:
const batchSize = 10
const batchTimeout = time.Second

// Collect messages
batchMessages = append(batchMessages, msg)
if len(batchMessages) >= batchSize {
    sendBatch(r, batchMessages)
    batchMessages = batchMessages[:0]
}
Enter fullscreen mode Exit fullscreen mode
  1. Connection Limits: Prevent server overload:
type SseService struct {
    connections int32
    maxConn     int32
}

func (s *SseService) HandleSSE(r *ghttp.Request) {
    if atomic.LoadInt32(&s.connections) >= s.maxConn {
        r.Response.WriteStatus(http.StatusServiceUnavailable)
        return
    }
    atomic.AddInt32(&s.connections, 1)
    defer atomic.AddInt32(&s.connections, -1)
    // Handle SSE...
}
Enter fullscreen mode Exit fullscreen mode

More Advanced Features πŸ”₯

Event Replay Support

Implement event replay for clients that reconnect:

type EventStore struct {
    events []Event
    mu     sync.RWMutex
}

type Event struct {
    ID      string
    Type    string
    Data    interface{}
    Time    time.Time
}

func (es *EventStore) AddEvent(event Event) {
    es.mu.Lock()
    defer es.mu.Unlock()
    es.events = append(es.events, event)
    // Keep last 100 events
    if len(es.events) > 100 {
        es.events = es.events[1:]
    }
}

func (es *EventStore) GetEventsSince(id string) []Event {
    es.mu.RLock()
    defer es.mu.RUnlock()

    for i, e := range es.events {
        if e.ID == id {
            return es.events[i+1:]
        }
    }
    return es.events
}

func HandleSSEWithReplay(r *ghttp.Request) {
    lastEventID := r.Header.Get("Last-Event-ID")

    if lastEventID != "" {
        // Send missed events
        events := eventStore.GetEventsSince(lastEventID)
        for _, event := range events {
            sendEvent(r, event)
        }
    }

    // Continue with normal SSE handling...
}
Enter fullscreen mode Exit fullscreen mode

Client Groups and Filtering

Implement client grouping for targeted updates:

type Client struct {
    ID       string
    Groups   []string
    Channel  chan interface{}
}

type SSEBroker struct {
    clients  map[string]*Client
    groups   map[string]map[string]*Client
    mu       sync.RWMutex
}

func (b *SSEBroker) AddClient(client *Client) {
    b.mu.Lock()
    defer b.mu.Unlock()

    b.clients[client.ID] = client
    for _, group := range client.Groups {
        if b.groups[group] == nil {
            b.groups[group] = make(map[string]*Client)
        }
        b.groups[group][client.ID] = client
    }
}

func (b *SSEBroker) BroadcastToGroup(group string, message interface{}) {
    b.mu.RLock()
    clients := b.groups[group]
    b.mu.RUnlock()

    for _, client := range clients {
        select {
        case client.Channel <- message:
        default:
            // Channel full, client might be slow
            log.Printf("Client %s message buffer full", client.ID)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Rate Limiting and Throttling

Implement rate limiting for high-frequency updates:

type ThrottledSSE struct {
    updateChan chan interface{}
    rate       time.Duration
}

func NewThrottledSSE(rate time.Duration) *ThrottledSSE {
    return &ThrottledSSE{
        updateChan: make(chan interface{}, 100),
        rate:       rate,
    }
}

func (t *ThrottledSSE) HandleUpdates(r *ghttp.Request) {
    ticker := time.NewTicker(t.rate)
    defer ticker.Stop()

    var lastUpdate interface{}

    for {
        select {
        case update := <-t.updateChan:
            lastUpdate = update

        case <-ticker.C:
            if lastUpdate != nil {
                // Send the most recent update
                data, _ := gjson.Encode(lastUpdate)
                r.Response.Write([]byte(fmt.Sprintf("data: %s\n\n", data)))
                r.Response.Flush()
                lastUpdate = nil
            }

        case <-r.Context().Done():
            return
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Pro Tips πŸ’‘

  1. Use CORS headers in production
  2. Add authentication for sensitive data
  3. Monitor connection counts and server resources
  4. Test with different network conditions

Wrapping Up πŸŽ‰

SSE is a powerful tool for real-time updates that's often overlooked in favor of WebSockets. For one-way communication, it's simpler, more lightweight, and works great with HTTP/2. With GoFrame, implementing SSE becomes even more straightforward and maintainable.

Here's a quick checklist for your SSE implementation:

  • βœ… Basic SSE setup with proper headers
  • βœ… Error handling and connection management
  • βœ… Authentication and authorization
  • βœ… Monitoring and metrics
  • βœ… Scaling strategy
  • βœ… Resource management
  • βœ… Client handling
  • βœ… Security considerations

What's Next? πŸš€

You could extend this implementation by:

  • Adding message persistence
  • Implementing message replay
  • Adding compression
  • Building client libraries
  • Adding WebSocket fallback
  • Implementing server-side filtering
  • Adding message prioritization

Have you used SSE in your projects? What challenges did you face? Share your experiences in the comments below! πŸ‘‡

P.S. Want to see the complete code? Check out my GitHub repo [link to be added] for a production-ready implementation!

Resources πŸ“š


If you found this helpful, follow me for more Go tutorials and real-world examples! ✨

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

πŸ‘‹ Kindness is contagious

Please leave a ❀️ or a friendly comment on this post if you found it helpful!

Okay