DEV Community

Cover image for Realtime log streaming in Go
Gatij
Gatij

Posted on • Updated on

Realtime log streaming in Go

Almost a tail -f simulation but in interesting way.

Let's tackle this problem by breaking it down into manageable tasks, providing clear explanations for each step. We'll start with an overview and then delve into each task.

Overview

  1. File Monitoring: Continuously monitor a log file for newly added content.
  2. Server Setup: Establish a server to handle incoming client connections and broadcast messages.
  3. Client Connection Handling: Manage connections and disconnections of clients.
  4. Message Broadcasting: Broadcast newly added log entries to all connected clients.
  5. Testing and Optimization: Ensure the solution is efficient and robust.

Task Breakdown

1 - File Monitoring
Goal: Setup a mechanism to monitor a log file for new additions in real-time.
Steps:

  • Use os package to read and monitor files.
  • Continuously read the file from the last known position.
  • Detect and read newly appended content.

Implementation:

package main

import (
    "os"
    "time"
    "io"
    "log"
)

func tailFile(filePath string, lines chan<- string) {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalf("failed to open file: %s", err)
    }
    defer file.Close()

    fi, err := file.Stat()
    if err != nil {
        log.Fatalf("failed to get file stats: %s", err)
    }

    // Start reading from end of file
    file.Seek(0, io.SeekEnd)
    offset := fi.Size()

    for {
        // Check the file size
        fi, err := file.Stat()
        if err != nil {
            log.Fatalf("failed to get file stats: %s", err)
        }

        if fi.Size() > offset {
            // Seek to the last position
            file.Seek(offset, io.SeekStart)
            buf := make([]byte, fi.Size()-offset)
            _, err := file.Read(buf)
            if err != nil && err != io.EOF {
                log.Fatalf("failed to read file: %s", err)
            }

            lines <- string(buf)
            offset = fi.Size()
        }

        time.Sleep(1 * time.Second)
    }
}
Enter fullscreen mode Exit fullscreen mode

This function will read new content from the specified file and send it to the lines channel.

2- Server Setup
Goal: Setup a basic server using Gorilla WebSocket to handle client connections.
Steps:

  • Use the github.com/gorilla/websocket package.
  • Create an HTTP server that upgrades connections to WebSocket.

Implementation:

package main

import (
    "net/http"
    "github.com/gorilla/websocket"
    "log"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // Allow all connections
        return true
    },
}

func handleConnections(w http.ResponseWriter, r *http.Request, clients map[*websocket.Conn]bool) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Fatalf("failed to upgrade connection: %s", err)
    }
    defer ws.Close()

    // Register the new client
    clients[ws] = true

    // Wait for new messages
    for {
        var msg string
        err := ws.ReadJSON(&msg)
        if err != nil {
            delete(clients, ws)
            break
        }
    }
}

func main() {
    clients := make(map[*websocket.Conn]bool)

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        handleConnections(w, r, clients)
    })

    log.Println("Server started on :8080")
    err := http.ListenAndServe(":8080", nil)
    if err != nil {
        log.Fatalf("failed to start server: %s", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

3- Client Connection Handling
Goal: Manage client connections and disconnections, ensuring robust handling.
Steps:

  • Maintain a map of active clients.
  • Safely add and remove clients.

Implementation:

package main

var clients = make(map[*websocket.Conn]bool)

func handleConnections(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("error upgrading to websocket: %v", err)
        return
    }
    defer ws.Close()

    clients[ws] = true

    for {
        _, _, err := ws.ReadMessage()
        if err != nil {
            delete(clients, ws)
            break
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

4- Message Broadcasting
Goal: Broadcast new log lines to all connected clients.
Steps:

  • Read from the lines channel.
  • Broadcast to all connected clients.

Implementation:

package main

func broadcastMessages(lines <-chan string, clients map[*websocket.Conn]bool) {
    for {
        msg := <-lines
        for client := range clients {
            err := client.WriteMessage(websocket.TextMessage, []byte(msg))
            if err != nil {
                client.Close()
                delete(clients, client)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

5- Integration and Optimization
Goal: Integrate all components and optimize performance.
Steps:

  • Combine file monitoring, server setup, and message broadcasting.
  • Add appropriate concurrency control mechanisms (channels, mutexes).

In this step, we will integrate the log file monitoring, server setup, client connection handling, and message broadcasting functionalities into a single cohesive program. We will also add concurrency control mechanisms to ensure thread safety and robustness.

Full Code Integration

package main

import (
    "log"
    "net/http"
    "os"
    "sync"
    "time"

    "github.com/gorilla/websocket"
)

// Upgrade configuration
var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // Allow cross-origin requests
        return true
    },
}

var (
    clients = make(map[*websocket.Conn]bool) // Map to store all active clients
    mu      sync.Mutex                       // Mutex to ensure thread safety
)

// handleConnections handles incoming websocket connections.
func handleConnections(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("error upgrading to websocket: %v", err)
        return
    }
    defer ws.Close()

    mu.Lock()
    clients[ws] = true
    mu.Unlock()

    // Keep the connection open
    for {
        if _, _, err := ws.ReadMessage(); err != nil {
            mu.Lock()
            delete(clients, ws)
            mu.Unlock()
            ws.Close()
            break
        }
    }
}

// broadcastMessages reads from the lines channel and sends to all clients.
func broadcastMessages(lines <-chan string) {
    for {
        msg := <-lines
        mu.Lock()
        for client := range clients {
            err := client.WriteMessage(websocket.TextMessage, []byte(msg))
            if err != nil {
                client.Close()
                delete(clients, client)
            }
        }
        mu.Unlock()
    }
}

// tailFile watches the given file for changes and sends new lines to the lines channel.
func tailFile(filePath string, lines chan<- string) {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalf("failed to open file: %v", err)
    }
    defer file.Close()

    fi, err := file.Stat()
    if err != nil {
        log.Fatalf("failed to get file stats: %v", err)
    }

    // Start reading from end of file
    file.Seek(0, io.SeekEnd)
    offset := fi.Size()

    for {
        fi, err := file.Stat()
        if err != nil {
            log.Fatalf("failed to get file stats: %v", err)
        }

        if fi.Size() > offset {
            // Seek to the last position
            file.Seek(offset, io.SeekStart)
            buf := make([]byte, fi.Size()-offset)
            _, err := file.Read(buf)
            if err != nil && err != io.EOF {
                log.Fatalf("failed to read file: %v", err)
            }

            lines <- string(buf)
            offset = fi.Size()
        }

        time.Sleep(1 * time.Second)
    }
}

// main function to start the server and initialize goroutines.
func main() {
    lines := make(chan string)

    go tailFile("test.log", lines)       // Start file tailing in a goroutine
    go broadcastMessages(lines)         // Start broadcasting messages in a goroutine

    http.HandleFunc("/ws", handleConnections) // Websocket endpoint

    log.Println("Server started on :8080")
    err := http.ListenAndServe(":8080", nil) // Start HTTP server
    if err != nil {
        log.Fatalf("Failed to start server: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Image description

Code Explanation:

File Monitoring:

  • The tailFile function is run in a goroutine, continuously monitoring the log file for new content and sending new lines to a channel (lines).

Server Setup:

  • The HTTP server is set up with the http.HandleFunc("/ws", handleConnections) which upgrades HTTP connections to WebSockets using Gorilla WebSocket library.

Client Handling:

  • Clients are handled in handleConnections. Connections are upgraded to WebSocket, and each connection is managed in a map called clients.
  • Mutex (mu) is used to ensure thread safety while adding or removing clients.

Message Broadcasting:

  • The broadcastMessages function reads from the lines channel and sends the content to all connected clients.
  • The function runs in its own goroutine and uses the mutex to ensure thread safety when accessing the clients map.

Integration and Optimization:

  • All components are integrated and run concurrently using goroutines.
  • Synchronization is handled with a mutex to ensure operations on the clients map are thread-safe.

Running the Program

1- Save the code in a file, for instance, main.go.
2- Ensure you have the Gorilla WebSocket package installed:

go get github.com/gorilla/websocket
Enter fullscreen mode Exit fullscreen mode

3- Run the Go program:

go run main.go
Enter fullscreen mode Exit fullscreen mode

4- Use a WebSocket client to connect to ws://localhost:8080/ws.

  • Creating a WebSocket client can be accomplished using various tools and methods. Below, I'll provide instructions and examples for creating a WebSocket client using both a CLI tool (like websocat)
  • Using a CLI Tool: websocat
  • websocat is a simple WebSocket client for the command line. You can install it and use it to connect to your WebSocket server.

Installation:

  • On macOS, you can install websocat using Homebrew:
brew install websocat
Enter fullscreen mode Exit fullscreen mode
  • On Ubuntu, you can install it via Snap:
sudo snap install websocat
Enter fullscreen mode Exit fullscreen mode

You can also download the binary directly from the GitHub releases page.

Usage:

To connect to your WebSocket server running at ws://localhost:8080/ws, you can use:

websocat ws://localhost:8080/ws
Enter fullscreen mode Exit fullscreen mode

Type a message and hit Enter to send it. Any messages received from the server will also be displayed in the terminal.

WebSockets are a widely used protocol for real-time, bidirectional communication between clients and servers. However, they do come with some limitations. Let's discuss these limitations and explore some alternatives that might be more suitable depending on the use case.

Limitations of Using WebSocket

Scalability: While WebSockets are effective for low to moderate traffic, scaling to handle a large number of concurrent connections can be challenging. This often requires sophisticated load balancing and infrastructure management.

State Management: WebSockets are stateful, which means each connection maintains its own state. This can become complicated when scaling horizontally because you need to ensure that sessions are properly managed across multiple servers (e.g., using sticky sessions or a distributed session store).

Resource Intensive: Each WebSocket connection consumes server resources. If you have many clients, this can rapidly consume memory and processing power, necessitating robust resource management.

Firewalls and Proxies: Some corporate firewalls and proxy servers block WebSocket connections because they don’t conform to the traditional HTTP request-response model. This can limit the accessibility of your application.

Security: Although WebSockets can be used over encrypted connections (wss://), they can still be vulnerable to attacks such as cross-site WebSocket hijacking (CSWSH). Ensuring robust security measures is essential.

Latency: While WebSockets have low latency, they are not always the best option for applications that require ultra-low latency or where the timing of messages is critical.

Alternatives to WebSocket

1- Server-Sent Events (SSE)

SSE is a standard allowing servers to push notifications to clients in a unidirectional stream over HTTP.
It is simpler to implement than WebSockets and works natively in many browsers without requiring additional libraries.
Use Cases:

Real-time updates like live feeds, notifications, or social media updates where the data flow is largely unidirectional (server to client).

  • Pros:
    Simpler protocol and easier to implement.
    Built-in reconnection logic.
    Less resource-intensive than WebSockets for unidirectional data flow.

  • Cons:
    Unidirectional (server-to-client) only.
    Less suitable for applications requiring bi-directional communication.

Example:

const eventSource = new EventSource('http://localhost:8080/events');

eventSource.onmessage = function(event) {
    console.log('New message from server: ', event.data);
};
Enter fullscreen mode Exit fullscreen mode

2- HTTP/2 and HTTP/3

The newer versions of HTTP (HTTP/2 and HTTP/3) support persistent connections and multiplexing, which can effectively simulate real-time communication.
They include features like server push, which allows the server to send data to clients without an explicit request.

Use Cases:
When you need to improve the performance and latency of web applications that already use HTTP for communication.

  • Pros:
    Improved performance and lower latency due to multiplexing.
    Better support and broader compatibility with existing HTTP infrastructure.

  • Cons:
    Requires updating server infrastructure to support HTTP/2 or HTTP/3.
    More complex than HTTP/1.1.

3- WebRTC

WebRTC (Web Real-Time Communication) is a technology designed for peer-to-peer communication, primarily for audio and video streaming.
It can also be used for real-time data transfer.

Use Cases:
Real-time audio and video communication.
Peer-to-peer file sharing or live streaming.

  • Pros:
    Peer-to-peer connections reduce server load.
    Built-in support for NAT traversal and encryption.

  • Cons:
    More complex to implement than WebSockets or SSE.
    Requires good understanding of signaling and peer connection management.

4- Message Brokers (e.g., MQTT, AMQP)

Protocols like MQTT and AMQP are designed for message queuing and are optimized for different use cases.
MQTT is lightweight and commonly used in IoT devices.
AMQP is more robust and feature-rich, suited for enterprise-level messaging.

Use Cases:
IoT applications.
Distributed systems requiring reliable message delivery.
Applications with complex routing and message queuing needs.

  • Pros:
    Robust and feature-rich (especially AMQP).
    Suitable for unreliable and constrained networks (especially MQTT).

  • Cons:
    Introduces extra infrastructure complexity.
    Requires a message broker server and usually more setup.

Summary

Depending on your specific requirements, WebSockets might still be a good choice. However, if you encounter limitations in terms of scalability, complexity, or suitability, considering one of the alternatives like Server-Sent Events (SSE), HTTP/2/3, WebRTC, or specialized message brokers like MQTT or AMQP might be more appropriate. Each of these alternatives has its own strengths and best-use scenarios, and understanding these will help you choose the most suitable technology for your application.

Top comments (1)

Collapse
 
endepointe profile image
Ende • Edited

Very cool. I made a similar project which can be found here:
github.com/endepointe/watchlog

We should consolidate all these log streaming demos into a repo. Vector.dev is a solid service as well.

tail -f is the winner here :)

Good job!