Series introduction
In part 4 I talked about how to relay TCP connection from "App" to the peer of WebSocket, especially implementation to take a WebSocket connection available from pools for relaying received requests.
Reverse HTTP proxy over WebSocket in Go (Part 4)
Kazuki Higashiguchi ・ Dec 28 '21
In this post, I will be starting to talk about how to send the request to the peer through the WebSocket connection.
- Start a WebSocket server (Part 1)
- Establish a WebSocket connection (Part 2)
- Relay TCP connection from "App" to the peer of WebSocket (Part 3 | Part 4 | Part 5)
- Relay TCP connection in WebSocket data to "internal API"
- Keep a established connection
Reverse HTTP proxy over WebSocket
A reverse HTTP proxy over WebSocket is a type of proxies, which retrieves resources on behalf on a client from servers and uses the WebSocket protocol as a "tunnel" to pass TCP communication from server to client.
I'll introduce Go sample project forked from root-gg/wsp (I forked it because maintenance has stopped and the Go language and libraries version needed to be updated).
Send the request to the peer through the WebSocket connection
Let's take a look at the continuation of the previous HTTP handler code, which waits the request from "app" to the endpoint /requests/
.
func (s *Server) request(w http.ResponseWriter, r *http.Request) {
// (omit): [1]: Receive requests to be proxied
// [2]: Take a WebSocket connection available from pools for relaying received requests.
request := NewConnectionRequest(s.Config.GetTimeout())
// "Dispatcher" is running in a separate thread from the server by `go s.dispatchConnections()`.
// It waits to receive requests to dispatch connection from available pools to clients requests.
// https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L93
//
// Notify request from handler to dispatcher through Server.dispatcher channel.
s.dispatcher <- request
// Dispatcher tries to find an available connection pool,
// and it returns the connection through Server.connection channel.
// https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L189
//
// Here waiting for a result from dispatcher.
connection := <-request.connection
if connection == nil {
// It means that dispatcher has set `nil` which is a system error case that is
// not expected in the normal flow.
wsp.ProxyErrorf(w, "Unable to get a proxy connection")
return
}
// [3]: Send the request to the peer through the WebSocket connection.
if err := connection.proxyRequest(w, r); err != nil {
// An error occurred throw the connection away
log.Println(err)
connection.Close()
// Try to return an error to the client
// This might fail if response headers have already been sent
wsp.ProxyError(w, err)
}
}
In the implementation up to part 4, we were able to identify the WebSocket connection to be relayed. Next, send an HTTP request over the connection.
connection.proxyRequest does it. Let's look at the implementation. There are six steps.
- Serialize HTTP request
- Send the HTTP request to the peer
- Wait the HTTP response is ready
- Read the HTTP response from the peer
- Wait the HTTP response body is ready
- Read the HTTP response body from the peer
Here is a diagram describing the steps of communication over the WebSocket connection.
1. Serialize HTTP request / 2. send an HTTP request on the WebSocket connection
At first, it serializes an HTTP request and send it to the WebSocket connection peer.
func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
log.Printf("proxy request to %s", connection.pool.id)
// [1]: Serialize HTTP request
jsonReq, err := json.Marshal(wsp.SerializeHTTPRequest(r))
if err != nil {
return fmt.Errorf("unable to serialize request : %w", err)
}
// [2]: Send the HTTP request to the peer
// Send the serialized HTTP request to the the peer
if err := connection.ws.WriteMessage(websocket.TextMessage, jsonReq); err != nil {
return fmt.Errorf("unable to write request : %w", err)
}
// Pipe the HTTP request body to the the peer
bodyWriter, err := connection.ws.NextWriter(websocket.BinaryMessage)
if err != nil {
return fmt.Errorf("unable to get request body writer : %w", err)
}
if _, err := io.Copy(bodyWriter, r.Body); err != nil {
return fmt.Errorf("unable to pipe request body : %w", err)
}
if err := bodyWriter.Close(); err != nil {
return fmt.Errorf("unable to pipe request body (close) : %w", err)
}
In WebSocket connection, the kind of message is defined in the RFC 6455 called Opcode, and Text Frame and Binary Frame can be used for data exchange.
Opcode | Meaning |
---|---|
0 | Continuation Frame |
1 | Text Frame |
2 | Binary Frame |
8 | Connection Close Frame |
9 | Ping Frame |
10 | Pong Frame |
In this case, it sends a JSON serialized HTTP meta data (URL, Method, Header, and Content-Length) in Text Frame, and sends a request body in Binary Frame.
type HTTPRequest struct {
Method string
URL string
Header map[string][]string
ContentLength int64
}
func SerializeHTTPRequest(req *http.Request) (r *HTTPRequest) {
r = new(HTTPRequest)
r.URL = req.URL.String()
r.Method = req.Method
r.Header = req.Header
r.ContentLength = req.ContentLength
return
}
SerializeHTTPRequest serializes HTTP request.
req.Header
is a value who type is http.Header. A Header represents the key-value pairs in an HTTP header.
type Header map[string][]string
A Header represents the key-value pairs in an HTTP header.
{
"Accept":["*/*"],
"User-Agent":["curl/7.77.0"],
"X-Proxy-Destination":["http://localhost:8081/hello"]
}
json.Marshal returns the JSON encoding of the value. Struct values encode as JSON objects.
jsonReq, err := json.Marshal(wsp.SerializeHTTPRequest(r))
Each exported struct field becomes a member of the object. If you want to customize the encoding struct field, you can use the struct field's tag json
.
type HTTPRequest struct {
Method string `json:"customized"` // {"customized":"GET"}
}
Send JSON data when serialize is complete.
connection.ws.WriteMessage(websocket.TextMessage, jsonReq)
websocket.WriteMessage is the function to write any message in the connection, and websocket.TextMessage is a constant representing a text data message.
Then, pipe the HTTP request body to the WebSocket connection peer.
// Pipe the HTTP request body to the the peer
bodyWriter, err := connection.ws.NextWriter(websocket.BinaryMessage)
if err != nil {
return fmt.Errorf("unable to get request body writer : %w", err)
}
if _, err := io.Copy(bodyWriter, r.Body); err != nil {
return fmt.Errorf("unable to pipe request body : %w", err)
}
if err := bodyWriter.Close(); err != nil {
return fmt.Errorf("unable to pipe request body (close) : %w", err)
}
websocket.NextWriter returns a writer for the next message to send. An application can send and receive messages using the io.WriteCloser and io.Reader interfaces. In this case, The content of r.Body
is written to writer.
3. Wait the HTTP response is ready to read
Let's move on the rest code, to receive an HTTP response and parse it. Here is one of difficult points to proxy an HTTP request over an WebSocket connection, because the wsp_server
does not know when the wsp_client
writes the HTTP response over the WebSocket connection. We need to implement waiting for response to come back.
I'll show you the whole picture first to address the issue.
In advance, the read function waits to receive the HTTP response as an another thread (goroutine) "reader”.
type Connection struct {
// (omit)
nextResponse chan chan io.Reader
}
// NewConnection returns a new Connection.
func NewConnection(pool *Pool, ws *websocket.Conn) *Connection {
// (omit: Initialize a new Connection)
// Start to listen to incoming messages over the WebSocket connection
go c.read()
return c
}
func (connection *Connection) read() {
defer func() {
if r := recover(); r != nil {
log.Printf("WebSocket crash recovered : %s", r)
}
connection.Close()
}()
for {
if connection.status == Closed {
break
}
// https://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages
_, reader, err := connection.ws.NextReader()
if err != nil {
break
}
if connection.status != Busy {
// We received a wild unexpected message
break
}
// When it gets here, it is expected to be either a HttpResponse or a HttpResponseBody has been returned.
//
// Next, it waits to receive the value from the Connection.proxyRequest function that is invoked in the "server" thread.
c := <-connection.nextResponse
if c == nil {
// We have been unlocked by Close()
break
}
// Send the reader back to Connection.proxyRequest
c <- reader
// Wait for proxyRequest to close the channel
// this notify that it is done with the reader
<-c
}
}
When the new WebSocket connection is connected, it starts a goroutine which listens to incoming messages over the WebSocket connection. It continues running until the connection itself is closed or any unexpected error occurs. In this loop, use websocket.NewReader to read incoming messages.
// read loop
for {
_, reader, err := connection.ws.NextReader()
if err != nil {
break
}
}
NextReader returns the next data message received from the peer. In this case, it means the message from a wsp_client
.
func (c *Conn) NextReader() (messageType int, r io.Reader, err error)
The returned messageType, which means opcode in RFC 6455, is either TextMessage or BinaryMessage. In addition to these opcodes, the WebSocket protocol defines three control messages: close (8: Connection Close Frame), ping (9: Ping Frame), and pong (10: Pong Frame). If you implement WebSocket messaging from scratch, you need to process these control messages, but gorilla/websocket is designed so that you don't have to be concern about it.
If the application is not otherwise interested in messages from the peer, then the application should start a goroutine to read and discard messages from the peer.
https://pkg.go.dev/github.com/gorilla/websocket@v1.4.2#hdr-Control_Messages
And, the a must break out of the read loop when websocket.NextReader returns a non-nil error value. gorilla/websocket implements that if you do not break out the read loop and repeat many times, you will get a panic like this:
// gorilla/websocket conn.go
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
// (omit)
// Applications that do handle the error returned from this method spin in
// tight loop on connection failure. To help application developers detect
// this error, panic on repeated reads to the failed connection.
c.readErrCount++
if c.readErrCount >= 1000 {
panic("repeated read on failed websocket connection")
}
return noFrame, nil, c.readErr
NextReader will block here until a message is received or the WebSocket connection is closed. The following code in gorolla/websocket keeps reading the next data frame until a TextMessage or BinaryMessage is received. Note that data frame is a message format in the WebSocket protocol defined in RFC 6455.
// gorilla/websocket conn.go
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
// (omit)
for c.readErr == nil {
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
// (omit)
}
See the following link more detail.
How WebSocket protocol designs bidirectional messaging and implements in Go
Kazuki Higashiguchi ・ Dec 13 '21
After a message is received from the peer, it waits to receive the value from the Connection.proxyRequest function that is invoked in the "server" thread.
func (connection *Connection) read() {
// (omit)
for {
// (omit)
c := <-connection.nextResponse
if c == nil {
// We have been unlocked by Close()
break
}
// Send the reader back to Connection.proxyRequest
c <- reader
// (omit)
}
}
When a "server" thread proxies, it sends the HTTP request over the WebSocket, and sends the channel of the io.Reader interface (chan io.Reader) that can read the HTTP response to the field nextResponse
, then waits until the value is written in the channel (chan io.Reader) by another thread "reader” (code).
After the thread "reader" detects that the HTTP response from the peer of the WebSocket connection has been written, it sends the value to the channel (chan io.Reader), and the "server" thread can proceed to process the rest procedures.
type Connection struct {
// (omit)
nextResponse chan chan io.Reader
}
func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
// (omit) [1]: Serialize HTTP request
// (omit) [2]: Send the HTTP request to the peer
// [3]: Wait the HTTP response is ready to read
responseChannel := make(chan (io.Reader))
connection.nextResponse <- responseChannel
responseReader, ok := <-responseChannel
if responseReader == nil {
if ok {
// The value of ok is false, the channel is closed and empty.
// See the Receiver operator in https://go.dev/ref/spec for more information.
close(responseChannel)
}
return fmt.Errorf("unable to get http response reader : %w", err)
}
}
When we get to this point, we are ready to read the response written over the WebSocket connection.
4. Read the HTTP response from the peer
Let's move on the next step: read the HTTP response from the peer, where gets the serialized HTTP response.
type HTTPResponse struct {
StatusCode int
Header http.Header
ContentLength int64
}
func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
// (omit)
// [4]: Read the HTTP response from the peer
// Get the serialized HTTP Response from the peer
jsonResponse, err := io.ReadAll(responseReader)
if err != nil {
close(responseChannel)
return fmt.Errorf("unable to read http response : %w", err)
}
// Notify the read() goroutine that we are done reading the response
close(responseChannel)
// Deserialize the HTTP Response
httpResponse := new(wsp.HTTPResponse)
if err := json.Unmarshal(jsonResponse, httpResponse); err != nil {
return fmt.Errorf("unable to unserialize http response : %w", err)
}
// Write response headers back to the client
for header, values := range httpResponse.Header {
for _, value := range values {
w.Header().Add(header, value)
}
}
w.WriteHeader(httpResponse.StatusCode)
// (omit)
}
The value responseReader
implements io.Reader interface, that's why we can read the data in responseReader
by io.ReadAll, which is added in Go 1.16.
After that, it closes the channel responseChannel
so that the "reader" thread will be notified that the response is finished to read.
func (connection *Connection) read() {
// (omit)
for {
// (omit)
// Send the reader back to Connection.proxyRequest
c <- reader
// Wait for proxyRequest to close the channel
// this notify that it is done with the reader
<-c
}
}
The "reader" thread waits until the "server" thread closes the responseChannel
channel because of the requirements of websocket.NextReader. As explained in godoc, we need to consider several specifications:
- No more than one goroutine calls the read methods (NextReader, SetReadDeadline, ReadMessage, ReadJSON, SetPongHandler, SetPingHandler) concurrently
- Connections support one concurrent reader and one concurrent writer
That's why we need to wait for websocket.NextReader to be consumed before requesting the next one.
Then next, unmarshal JSON into the struct HTTPResponse
by json.Unmarshal. json.Unmarshal matches incoming JSON object keys to the keys used by Marshal, either the struct field name or its tag (json:"xxx"
).
Now we can get the HTTP response headers and status code.
5. Wait the HTTP response body is ready / 6. Read the HTTP response body from the peer
Finally, follow the same steps again to read the HTTP response body again.
func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Request) (err error) {
// (omit)
// [5]: Wait the HTTP response body is ready
// Get the HTTP Response body from the the peer
// To do so send a new channel to the read() goroutine
// to get the next message reader
responseBodyChannel := make(chan (io.Reader))
connection.nextResponse <- responseBodyChannel
responseBodyReader, ok := <-responseBodyChannel
if responseBodyReader == nil {
if ok {
// If more is false the channel is already closed
close(responseChannel)
}
return fmt.Errorf("unable to get http response body reader : %w", err)
}
// [6]: Read the HTTP response body from the peer
// Pipe the HTTP response body right from the remote Proxy to the client
if _, err := io.Copy(w, responseBodyReader); err != nil {
close(responseBodyChannel)
return fmt.Errorf("unable to pipe response body : %w", err)
}
// Notify read() that we are done reading the response body
close(responseBodyChannel)
connection.Release()
return
}
Conclusion
Following part 3 and part 4, I explained how to relay TCP connection from "App" to the peer of WebSocket.
In part 6, I'll explain how to relay TCP connection in WebSocket data to "internal API".
Top comments (0)