DEV Community

Kazuki Higashiguchi
Kazuki Higashiguchi

Posted on

Reverse HTTP proxy over WebSocket in Go (Part 5)

Series introduction

In part 4 I talked about how to relay TCP connection from "App" to the peer of WebSocket, especially implementation to take a WebSocket connection available from pools for relaying received requests.

In this post, I will be starting to talk about how to send the request to the peer through the WebSocket connection.

  • Start a WebSocket server (Part 1)
  • Establish a WebSocket connection (Part 2)
  • Relay TCP connection from "App" to the peer of WebSocket (Part 3 | Part 4 | Part 5)
  • Relay TCP connection in WebSocket data to "internal API"
  • Keep a established connection

Reverse HTTP proxy over WebSocket

A reverse HTTP proxy over WebSocket is a type of proxies, which retrieves resources on behalf on a client from servers and uses the WebSocket protocol as a "tunnel" to pass TCP communication from server to client.

A network diagram for reverse proxy over WebSocket

I'll introduce Go sample project forked from root-gg/wsp (I forked it because maintenance has stopped and the Go language and libraries version needed to be updated).

GitHub logo hgsgtk / wsp

HTTP tunnel over Websocket

Send the request to the peer through the WebSocket connection

Let's take a look at the continuation of the previous HTTP handler code, which waits the request from "app" to the endpoint /requests/.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {
    // (omit): [1]: Receive requests to be proxied

    // [2]: Take a WebSocket connection available from pools for relaying received requests.
    request := NewConnectionRequest(s.Config.GetTimeout())
    // "Dispatcher" is running in a separate thread from the server by `go s.dispatchConnections()`.
    // It waits to receive requests to dispatch connection from available pools to clients requests.
    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L93
    //
    // Notify request from handler to dispatcher through Server.dispatcher channel.
    s.dispatcher <- request
    // Dispatcher tries to find an available connection pool,
    // and it returns the connection through Server.connection channel.
    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L189
    //
    // Here waiting for a result from dispatcher.
    connection := <-request.connection
    if connection == nil {
        // It means that dispatcher has set `nil` which is a system error case that is
        // not expected in the normal flow.
        wsp.ProxyErrorf(w, "Unable to get a proxy connection")
        return
    }

    // [3]: Send the request to the peer through the WebSocket connection.
    if err := connection.proxyRequest(w, r); err != nil {
        // An error occurred throw the connection away
        log.Println(err)
        connection.Close()

        // Try to return an error to the client
        // This might fail if response headers have already been sent
        wsp.ProxyError(w, err)
    }
}
Enter fullscreen mode Exit fullscreen mode

In the implementation up to part 4, we were able to identify the WebSocket connection to be relayed. Next, send an HTTP request over the connection.

connection.proxyRequest does it. Let's look at the implementation. There are six steps.

  1. Serialize HTTP request
  2. Send the HTTP request to the peer
  3. Wait the HTTP response is ready
  4. Read the HTTP response from the peer
  5. Wait the HTTP response body is ready
  6. Read the HTTP response body from the peer

Here is a diagram describing the steps of communication over the WebSocket connection.

A diagram describing the steps of communication over the WebSocket connection

1. Serialize HTTP request / 2. send an HTTP request on the WebSocket connection

At first, it serializes an HTTP request and send it to the WebSocket connection peer.

func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
    log.Printf("proxy request to %s", connection.pool.id)

    // [1]: Serialize HTTP request
    jsonReq, err := json.Marshal(wsp.SerializeHTTPRequest(r))
    if err != nil {
        return fmt.Errorf("unable to serialize request : %w", err)
    }

    // [2]: Send the HTTP request to the peer
    // Send the serialized HTTP request to the the peer
    if err := connection.ws.WriteMessage(websocket.TextMessage, jsonReq); err != nil {
        return fmt.Errorf("unable to write request : %w", err)
    }

    // Pipe the HTTP request body to the the peer
    bodyWriter, err := connection.ws.NextWriter(websocket.BinaryMessage)
    if err != nil {
        return fmt.Errorf("unable to get request body writer : %w", err)
    }
    if _, err := io.Copy(bodyWriter, r.Body); err != nil {
        return fmt.Errorf("unable to pipe request body : %w", err)
    }
    if err := bodyWriter.Close(); err != nil {
        return fmt.Errorf("unable to pipe request body (close) : %w", err)
    }
Enter fullscreen mode Exit fullscreen mode

In WebSocket connection, the kind of message is defined in the RFC 6455 called Opcode, and Text Frame and Binary Frame can be used for data exchange.

Opcode Meaning
0 Continuation Frame
1 Text Frame
2 Binary Frame
8 Connection Close Frame
9 Ping Frame
10 Pong Frame

In this case, it sends a JSON serialized HTTP meta data (URL, Method, Header, and Content-Length) in Text Frame, and sends a request body in Binary Frame.

type HTTPRequest struct {
    Method        string
    URL           string
    Header        map[string][]string
    ContentLength int64
}

func SerializeHTTPRequest(req *http.Request) (r *HTTPRequest) {
    r = new(HTTPRequest)
    r.URL = req.URL.String()
    r.Method = req.Method
    r.Header = req.Header
    r.ContentLength = req.ContentLength
    return
}
Enter fullscreen mode Exit fullscreen mode

SerializeHTTPRequest serializes HTTP request.

req.Header is a value who type is http.Header. A Header represents the key-value pairs in an HTTP header.

type Header map[string][]string
Enter fullscreen mode Exit fullscreen mode

A Header represents the key-value pairs in an HTTP header.

{
    "Accept":["*/*"],
    "User-Agent":["curl/7.77.0"],
    "X-Proxy-Destination":["http://localhost:8081/hello"]
}
Enter fullscreen mode Exit fullscreen mode

json.Marshal returns the JSON encoding of the value. Struct values encode as JSON objects.

jsonReq, err := json.Marshal(wsp.SerializeHTTPRequest(r))
Enter fullscreen mode Exit fullscreen mode

Each exported struct field becomes a member of the object. If you want to customize the encoding struct field, you can use the struct field's tag json.

type HTTPRequest struct {
    Method string `json:"customized"` // {"customized":"GET"}
}
Enter fullscreen mode Exit fullscreen mode

Send JSON data when serialize is complete.

connection.ws.WriteMessage(websocket.TextMessage, jsonReq)
Enter fullscreen mode Exit fullscreen mode

websocket.WriteMessage is the function to write any message in the connection, and websocket.TextMessage is a constant representing a text data message.

Then, pipe the HTTP request body to the WebSocket connection peer.

// Pipe the HTTP request body to the the peer
bodyWriter, err := connection.ws.NextWriter(websocket.BinaryMessage)
if err != nil {
    return fmt.Errorf("unable to get request body writer : %w", err)
}
if _, err := io.Copy(bodyWriter, r.Body); err != nil {
    return fmt.Errorf("unable to pipe request body : %w", err)
}
if err := bodyWriter.Close(); err != nil {
    return fmt.Errorf("unable to pipe request body (close) : %w", err)
}
Enter fullscreen mode Exit fullscreen mode

websocket.NextWriter returns a writer for the next message to send. An application can send and receive messages using the io.WriteCloser and io.Reader interfaces. In this case, The content of r.Body is written to writer.

3. Wait the HTTP response is ready to read

Let's move on the rest code, to receive an HTTP response and parse it. Here is one of difficult points to proxy an HTTP request over an WebSocket connection, because the wsp_server does not know when the wsp_client writes the HTTP response over the WebSocket connection. We need to implement waiting for response to come back.

A diagram describing the communication flow between wsp_server and wsp_client

I'll show you the whole picture first to address the issue.

The whole picture

In advance, the read function waits to receive the HTTP response as an another thread (goroutine) "reader”.

type Connection struct {
    // (omit)
    nextResponse chan chan io.Reader
}

// NewConnection returns a new Connection.
func NewConnection(pool *Pool, ws *websocket.Conn) *Connection {
    // (omit: Initialize a new Connection)

    // Start to listen to incoming messages over the WebSocket connection
    go c.read()

    return c
}

func (connection *Connection) read() {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("WebSocket crash recovered : %s", r)
        }
        connection.Close()
    }()

    for {
        if connection.status == Closed {
            break
        }

        // https://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages
        _, reader, err := connection.ws.NextReader()
        if err != nil {
            break
        }

        if connection.status != Busy {
            // We received a wild unexpected message
            break
        }

        // When it gets here, it is expected to be either a HttpResponse or a HttpResponseBody has been returned.
        //
        // Next, it waits to receive the value from the Connection.proxyRequest function that is invoked in the "server" thread.
        c := <-connection.nextResponse
        if c == nil {
            // We have been unlocked by Close()
            break
        }

        // Send the reader back to Connection.proxyRequest
        c <- reader

        // Wait for proxyRequest to close the channel
        // this notify that it is done with the reader
        <-c
    }
}
Enter fullscreen mode Exit fullscreen mode

When the new WebSocket connection is connected, it starts a goroutine which listens to incoming messages over the WebSocket connection. It continues running until the connection itself is closed or any unexpected error occurs. In this loop, use websocket.NewReader to read incoming messages.

// read loop
for {
    _, reader, err := connection.ws.NextReader()
    if err != nil {
        break
    }
}
Enter fullscreen mode Exit fullscreen mode

NextReader returns the next data message received from the peer. In this case, it means the message from a wsp_client.

func (c *Conn) NextReader() (messageType int, r io.Reader, err error)
Enter fullscreen mode Exit fullscreen mode

The returned messageType, which means opcode in RFC 6455, is either TextMessage or BinaryMessage. In addition to these opcodes, the WebSocket protocol defines three control messages: close (8: Connection Close Frame), ping (9: Ping Frame), and pong (10: Pong Frame). If you implement WebSocket messaging from scratch, you need to process these control messages, but gorilla/websocket is designed so that you don't have to be concern about it.

If the application is not otherwise interested in messages from the peer, then the application should start a goroutine to read and discard messages from the peer.

https://pkg.go.dev/github.com/gorilla/websocket@v1.4.2#hdr-Control_Messages

And, the a must break out of the read loop when websocket.NextReader returns a non-nil error value. gorilla/websocket implements that if you do not break out the read loop and repeat many times, you will get a panic like this:

// gorilla/websocket conn.go
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
    // (omit)

    // Applications that do handle the error returned from this method spin in
    // tight loop on connection failure. To help application developers detect
    // this error, panic on repeated reads to the failed connection.
    c.readErrCount++
    if c.readErrCount >= 1000 {
        panic("repeated read on failed websocket connection")
    }

    return noFrame, nil, c.readErr
Enter fullscreen mode Exit fullscreen mode

NextReader will block here until a message is received or the WebSocket connection is closed. The following code in gorolla/websocket keeps reading the next data frame until a TextMessage or BinaryMessage is received. Note that data frame is a message format in the WebSocket protocol defined in RFC 6455.

// gorilla/websocket conn.go
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
    // (omit)

    for c.readErr == nil {
        frameType, err := c.advanceFrame()
        if err != nil {
            c.readErr = hideTempErr(err)
            break
        }

        if frameType == TextMessage || frameType == BinaryMessage {
            c.messageReader = &messageReader{c}
            c.reader = c.messageReader
            if c.readDecompress {
                c.reader = c.newDecompressionReader(c.reader)
            }
            return frameType, c.reader, nil
        }
    }

    // (omit)
}
Enter fullscreen mode Exit fullscreen mode

See the following link more detail.

After a message is received from the peer, it waits to receive the value from the Connection.proxyRequest function that is invoked in the "server" thread.


func (connection *Connection) read() {
    // (omit)
    for {
        // (omit)

        c := <-connection.nextResponse
        if c == nil {
            // We have been unlocked by Close()
            break
        }

        // Send the reader back to Connection.proxyRequest
        c <- reader

        // (omit)
    }
}
Enter fullscreen mode Exit fullscreen mode

When a "server" thread proxies, it sends the HTTP request over the WebSocket, and sends the channel of the io.Reader interface (chan io.Reader) that can read the HTTP response to the field nextResponse, then waits until the value is written in the channel (chan io.Reader) by another thread "reader” (code).

After the thread "reader" detects that the HTTP response from the peer of the WebSocket connection has been written, it sends the value to the channel (chan io.Reader), and the "server" thread can proceed to process the rest procedures.

type Connection struct {
    // (omit)
    nextResponse chan chan io.Reader
}

func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
    // (omit) [1]: Serialize HTTP request
    // (omit) [2]: Send the HTTP request to the peer

    // [3]: Wait the HTTP response is ready to read
    responseChannel := make(chan (io.Reader))
    connection.nextResponse <- responseChannel
    responseReader, ok := <-responseChannel
    if responseReader == nil {
        if ok {
            // The value of ok is false, the channel is closed and empty.
            // See the Receiver operator in https://go.dev/ref/spec for more information.
            close(responseChannel)
        }
        return fmt.Errorf("unable to get http response reader : %w", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

When we get to this point, we are ready to read the response written over the WebSocket connection.

4. Read the HTTP response from the peer

Let's move on the next step: read the HTTP response from the peer, where gets the serialized HTTP response.

type HTTPResponse struct {
    StatusCode    int
    Header        http.Header
    ContentLength int64
}

func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
    // (omit)

    // [4]: Read the HTTP response from the peer
    // Get the serialized HTTP Response from the peer
    jsonResponse, err := io.ReadAll(responseReader)
    if err != nil {
        close(responseChannel)
        return fmt.Errorf("unable to read http response : %w", err)
    }

    // Notify the read() goroutine that we are done reading the response
    close(responseChannel)

    // Deserialize the HTTP Response
    httpResponse := new(wsp.HTTPResponse)
    if err := json.Unmarshal(jsonResponse, httpResponse); err != nil {
        return fmt.Errorf("unable to unserialize http response : %w", err)
    }

    // Write response headers back to the client
    for header, values := range httpResponse.Header {
        for _, value := range values {
            w.Header().Add(header, value)
        }
    }
    w.WriteHeader(httpResponse.StatusCode)

    // (omit)
}
Enter fullscreen mode Exit fullscreen mode

The value responseReader implements io.Reader interface, that's why we can read the data in responseReader by io.ReadAll, which is added in Go 1.16.

After that, it closes the channel responseChannel so that the "reader" thread will be notified that the response is finished to read.

func (connection *Connection) read() {
    // (omit)
    for {
        // (omit)

        // Send the reader back to Connection.proxyRequest
        c <- reader

        // Wait for proxyRequest to close the channel
        // this notify that it is done with the reader
        <-c
    }
}
Enter fullscreen mode Exit fullscreen mode

The "reader" thread waits until the "server" thread closes the responseChannel channel because of the requirements of websocket.NextReader. As explained in godoc, we need to consider several specifications:

  • No more than one goroutine calls the read methods (NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler) concurrently
  • Connections support one concurrent reader and one concurrent writer

That's why we need to wait for websocket.NextReader to be consumed before requesting the next one.

Then next, unmarshal JSON into the struct HTTPResponse by json.Unmarshal. json.Unmarshal matches incoming JSON object keys to the keys used by Marshal, either the struct field name or its tag (json:"xxx").

Now we can get the HTTP response headers and status code.

5. Wait the HTTP response body is ready / 6. Read the HTTP response body from the peer

Finally, follow the same steps again to read the HTTP response body again.

func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
    // (omit)

    // [5]: Wait the HTTP response body is ready
    // Get the HTTP Response body from the the peer
    // To do so send a new channel to the read() goroutine
    // to get the next message reader
    responseBodyChannel := make(chan (io.Reader))
    connection.nextResponse <- responseBodyChannel
    responseBodyReader, ok := <-responseBodyChannel
    if responseBodyReader == nil {
        if ok {
            // If more is false the channel is already closed
            close(responseChannel)
        }
        return fmt.Errorf("unable to get http response body reader : %w", err)
    }

    // [6]: Read the HTTP response body from the peer
    // Pipe the HTTP response body right from the remote Proxy to the client
    if _, err := io.Copy(w, responseBodyReader); err != nil {
        close(responseBodyChannel)
        return fmt.Errorf("unable to pipe response body : %w", err)
    }

    // Notify read() that we are done reading the response body
    close(responseBodyChannel)

    connection.Release()

    return
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Following part 3 and part 4, I explained how to relay TCP connection from "App" to the peer of WebSocket.

In part 6, I'll explain how to relay TCP connection in WebSocket data to "internal API".

Top comments (0)