DEV Community

Kazuki Higashiguchi
Kazuki Higashiguchi

Posted on

Reverse HTTP proxy over WebSocket in Go (Part 4)

Series introduction

In part 3 I talked about how to relay TCP connection from "App" to the peer of WebSocket, especially implementation to receive requests to be proxied and to pool the WebSocket connection on the server for relaying in Go.

In this post, I will be starting to talk about how to relay TCP connection from "App" to the peer of WebSocket in Go.

  • Start a WebSocket server (Part 1)
  • Establish a WebSocket connection (Part 2)
  • Relay TCP connection from "App" to the peer of WebSocket (Part 3 | Part 4 | Part 5)
  • Relay TCP connection in WebSocket data to "internal API"
  • Keep a established connection

Reverse HTTP proxy over WebSocket

A reverse HTTP proxy over WebSocket is a type of proxies, which retrieves resources on behalf on a client from servers and uses the WebSocket protocol as a "tunnel" to pass TCP communication from server to client.

A network diagram for reverse proxy over WebSocket

I'll introduce Go sample project forked from root-gg/wsp (I forked it because maintenance has stopped and the Go language and libraries version needed to be updated).

GitHub logo hgsgtk / wsp

HTTP tunnel over Websocket

Relay TCP connection to the peer WebSocket

A reverse HTTP proxy over WebSocket relay TCP connection to the peer WebSocket over the WebSocket connection.

HTTP communication is relayed by the following route.

app -[1]-> wsp server -[2](WebSocket)-> wsp client -> internal API
Enter fullscreen mode Exit fullscreen mode

And, these flow are divided into three parts to explain it.

  1. Receive requests to be proxied ([1] in the relay flow)
  2. Pool the WebSocket connection on the server for relaying
  3. Relay TCP connection to the peer WebSocket ([2] in the relay flow)

I explained the 1st and 2nd flow in part 3, so let's move on the 3rd flow.

Let's take a look at the continuation of the previous HTTP handler code, which waits the request from "app" to the endpoint /requests/.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {
    // (omit): [1]: Receive requests to be proxied

    // [2]: Take a WebSocket connection available from pools for relaying received requests.
    request := NewConnectionRequest(s.Config.GetTimeout())
    // "Dispatcher" is running in a separate thread from the server by `go s.dispatchConnections()`.
    // It waits to receive requests to dispatch connection from available pools to clients requests.
    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L93
    //
    // Notify request from handler to dispatcher through Server.dispatcher channel.
    s.dispatcher <- request
    // Dispatcher tries to find an available connection pool,
    // and it returns the connection through Server.connection channel.
    // https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L189
    //
    // Here waiting for a result from dispatcher.
    connection := <-request.connection
    if connection == nil {
        // It means that dispatcher has set `nil` which is a system error case that is
        // not expected in the normal flow.
        wsp.ProxyErrorf(w, "Unable to get a proxy connection")
        return
    }

    // [3]: Send the request to the peer through the WebSocket connection.
    if err := connection.proxyRequest(w, r); err != nil {
        // An error occurred throw the connection away
        log.Println(err)
        connection.Close()

        // Try to return an error to the client
        // This might fail if response headers have already been sent
        wsp.ProxyError(w, err)
    }
}
Enter fullscreen mode Exit fullscreen mode

In brief, the following process is performed.

  • Take a WebSocket connection available from pools for relaying received requests
  • Send the request to the peer through the WebSocket connection

I'll explain the one by one.

Take a WebSocket connection available from pools for relaying received requests

As a pre-requisite, we will start with WebSocket connection already established with the WebSocket client (wsp_client) and held by the server as a pool (Chapter 2).

func (s *Server) Register(w http.ResponseWriter, r *http.Request) {
    // - 1. Upgrade a received HTTP request to a WebSocket connection
    // (omit)
    // - 2. Wait a greeting message from the peer and parse it
    // (omit)

    // 3. Register the connection into server pools.
    // (omit)
    // Add the WebSocket connection to the pool
    pool.Register(ws)
}
Enter fullscreen mode Exit fullscreen mode

There are several possible designs for retrieving the connection from the pools, and I'll explain the pattern of using multiple threads. Specifically, there are two threads running: "Server", which accepts http requests, and "Dispatcher", which dispatches connections from the pools to be used to relay. Here is the Go code to start "Server" and "Dispatcher" that will be called from the main function.

func (s *Server) Start() {
    // (omit)...

    // start the "Dispatcher"
    go s.dispatchConnections()

    // start the "Server"
    s.server = &http.Server{
        Addr:    s.Config.GetAddr(),
        Handler: r,
    }
    go func() { log.Fatal(s.server.ListenAndServe()) }()
}
Enter fullscreen mode Exit fullscreen mode

The go statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same the address space.

A diagram describing two threads in wsp_server

Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.

type Server struct {
    // (omit)

    // Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.
    // "server" thread sends the value to this channel when accepting requests in the endpoint /requests, 
    // and "dispatcher" thread reads this channel.
    dispatcher chan *ConnectionRequest
}

func (s *Server) dispatchConnections() {
    for {
        //  The operator <- is "receive operator", which expression blocks until a value is available.
        request, ok := <-s.dispatcher
        if !ok {
            // The value of `ok` is false if it is a zero value generated because the channel is closed an empty.
            // In this case, that means server shutdowns.           break
        }

        for {
            // (omit)...

            // Verify that we can use this connection
            if connection.Take() {
                request.connection <- connection
                break
            }
        }

        // (omit)...
    }
}
Enter fullscreen mode Exit fullscreen mode

The type of the field dispatcher in the Server structure is channel. Channel types provide a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.

A diagram describing how dispatcher channel is used

for {
    request, ok := <-s.dispatcher
    if !ok {
        break
    }
}
Enter fullscreen mode Exit fullscreen mode

The operator x, ok := <-ch is receiver operator. The value of the receive operation <-s.dispatcher is the value received from the channel s.dispatcher. Also, the expression <- blocks until a value is available, so waits until a connection request is sent by "Server" thread.

The value of ok is false if it is a zero value generated because the channel is closed or empty. In this case, that means server shutdowns.

A diagram describing how dispatcher channel is used

On the other hand, the /request/ handler on the "Server" tread sends a value to this channel.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {
    // (omit): [1]: Receive requests to be proxied

    // Here! Sends a value to dispatcher channel
    request := NewConnectionRequest(s.Config.GetTimeout())
    s.dispatcher <- request

    connection := <-request.connection
    if connection == nil {
        wsp.ProxyErrorf(w, "Unable to get a proxy connection")
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

The operator ch <- x is send statements, which sends a value on a channel. Here it sends the request to the dispatcher channel.

By the way, the type of variable request is ConnectionRequest.

type ConnectionRequest struct {
    connection chan *Connection
}
Enter fullscreen mode Exit fullscreen mode

After sending the value to the s.dispatcher channel, it waits to be available to get the value in the request.connection channel.

A diagram describing how connection channel is used

func (s *Server) request(w http.ResponseWriter, r *http.Request) {
    // (omit)
    s.dispatcher <- request

    // Here!
    connection := <-request.connection
    if connection == nil {
        wsp.ProxyErrorf(w, "Unable to get a proxy connection")
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

Next, let's look at the "Dispatcher" thread.

To summarize, "Server" thread sends the value to this channel when accepting requests in the endpoint /requests, and "dispatcher" thread reads this channel.

Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.

type Server struct {
    // (omit)

    // Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.
    // "server" thread sends the value to this channel when accepting requests in the endpoint /requests, 
    // and "dispatcher" thread reads this channel.
    dispatcher chan *ConnectionRequest
}

func (s *Server) dispatchConnections() {
    for {
        // Runs in an infinite loop and keeps receiving the value from the `server.dispatcher` channel
        // The operator <- is "receive operator", which expression blocks until a value is available.
        request, ok := <-s.dispatcher
        if !ok {
            // The value of `ok` is false if it is a zero value generated because the channel is closed an empty.
            // In this case, that means server shutdowns.
            break
        }

        // A timeout is set for each dispatch request.
        ctx := context.Background()
        ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())
        defer cancel()

    L:
        for {
            select {
            case <-ctx.Done(): // The timeout elapses
                break L
            default: // Go through
            }

            s.lock.RLock()
            if len(s.pools) == 0 {
                // No connection pool available
                s.lock.RUnlock()
                break
            }

            // [1]: Select a pool which has an idle connection
            // Build a select statement dynamically to handle an arbitrary number of pools.
            cases := make([]reflect.SelectCase, len(s.pools)+1)
            for i, ch := range s.pools {
                cases[i] = reflect.SelectCase{
                    Dir:  reflect.SelectRecv,
                    Chan: reflect.ValueOf(ch.idle)}
            }
            cases[len(cases)-1] = reflect.SelectCase{
                Dir: reflect.SelectDefault}
            s.lock.RUnlock()

            _, value, ok := reflect.Select(cases)
            if !ok {
                continue // a pool has been removed, try again
            }
            connection, _ := value.Interface().(*Connection)

            // [2]: Verify that we can use this connection and take it.
            if connection.Take() {
                request.connection <- connection
                break
            }
        }

        close(request.connection)
    }
}
Enter fullscreen mode Exit fullscreen mode

First, dispatchConnections runs in an infinite loop and keeps receiving the value from the server.dispatcher channel.

for {
    request, ok := <-s.dispatcher
    // ...
}
Enter fullscreen mode Exit fullscreen mode

The next step is to set the timeout, if no idle connection is obtained after a predetermined time, the channel will be closed.

// A timeout is set for each dispatch request.
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())
defer cancel()

L:
    for {
        select {
        case <-ctx.Done(): // The timeout elapses
            break L
        default: // Go through
        }

        // ...
    }

    close(request.connection)

Enter fullscreen mode Exit fullscreen mode

When the channel is closed, a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received (See The Go Programming Language Specification#Receive operator more detail). In this case, the "Server" thread waits to receive the value, and will get nil from request.connection channel, so the "Server" will return the error response.

func (s *Server) request(w http.ResponseWriter, r *http.Request) {
    // (omit)
    s.dispatcher <- request

    // Here!
    connection := <-request.connection
    if connection == nil {
        wsp.ProxyErrorf(w, "Unable to get a proxy connection")
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

Also, if you want to know more about timeout using the context package, please refer to the following post.

Then, select a pool which has an idle connection by building a select statement dynamically to handle an arbitrary number of pools.

cases := make([]reflect.SelectCase, len(s.pools)+1)
for i, ch := range s.pools {
    cases[i] = reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ch.idle)}
}
cases[len(cases)-1] = reflect.SelectCase{
    Dir: reflect.SelectDefault}
s.lock.RUnlock()

_, value, ok := reflect.Select(cases)
if !ok {
    continue // a pool has been removed, try again
}
connection, _ := value.Interface().(*Connection)
Enter fullscreen mode Exit fullscreen mode

reflect.Select allows us to receive a variable number of channels. See the following post for more information.

Lastly, Verify that we can use this connection and take it.

if connection.Take() {
    request.connection <- connection
    break
}
Enter fullscreen mode Exit fullscreen mode

connection.Take verify the status of connection whether it's available one or not, then if it's available one, mark it busy.

A diagram describing how connection channel is used

That's it to take a WebSocket connection available from pools for relaying received requests.

Conclusion

Following part 3, I explained how to relay TCP connection from "App" to the peer of WebSocket. Especially, I focused on the way to take a WebSocket connection available from pools for relaying received requests.

In part 5, I'll explain how to send the request to the peer through the WebSocket connection.

Top comments (0)