Series introduction
In part 3 I talked about how to relay TCP connection from "App" to the peer of WebSocket, especially implementation to receive requests to be proxied and to pool the WebSocket connection on the server for relaying in Go.
Reverse HTTP proxy over WebSocket in Go (Part 3)
Kazuki Higashiguchi ・ Dec 16 '21
In this post, I will be starting to talk about how to relay TCP connection from "App" to the peer of WebSocket in Go.
- Start a WebSocket server (Part 1)
- Establish a WebSocket connection (Part 2)
- Relay TCP connection from "App" to the peer of WebSocket (Part 3 | Part 4 | Part 5)
- Relay TCP connection in WebSocket data to "internal API"
- Keep a established connection
Reverse HTTP proxy over WebSocket
A reverse HTTP proxy over WebSocket is a type of proxies, which retrieves resources on behalf on a client from servers and uses the WebSocket protocol as a "tunnel" to pass TCP communication from server to client.
I'll introduce Go sample project forked from root-gg/wsp (I forked it because maintenance has stopped and the Go language and libraries version needed to be updated).
Relay TCP connection to the peer WebSocket
A reverse HTTP proxy over WebSocket relay TCP connection to the peer WebSocket over the WebSocket connection.
HTTP communication is relayed by the following route.
app -[1]-> wsp server -[2](WebSocket)-> wsp client -> internal API
And, these flow are divided into three parts to explain it.
- Receive requests to be proxied (
[1]
in the relay flow) - Pool the WebSocket connection on the server for relaying
- Relay TCP connection to the peer WebSocket (
[2]
in the relay flow)
I explained the 1st and 2nd flow in part 3, so let's move on the 3rd flow.
Let's take a look at the continuation of the previous HTTP handler code, which waits the request from "app" to the endpoint /requests/
.
func (s *Server) request(w http.ResponseWriter, r *http.Request) {
// (omit): [1]: Receive requests to be proxied
// [2]: Take a WebSocket connection available from pools for relaying received requests.
request := NewConnectionRequest(s.Config.GetTimeout())
// "Dispatcher" is running in a separate thread from the server by `go s.dispatchConnections()`.
// It waits to receive requests to dispatch connection from available pools to clients requests.
// https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L93
//
// Notify request from handler to dispatcher through Server.dispatcher channel.
s.dispatcher <- request
// Dispatcher tries to find an available connection pool,
// and it returns the connection through Server.connection channel.
// https://github.com/hgsgtk/wsp/blob/ea4902a8e11f820268e52a6245092728efeffd7f/server/server.go#L189
//
// Here waiting for a result from dispatcher.
connection := <-request.connection
if connection == nil {
// It means that dispatcher has set `nil` which is a system error case that is
// not expected in the normal flow.
wsp.ProxyErrorf(w, "Unable to get a proxy connection")
return
}
// [3]: Send the request to the peer through the WebSocket connection.
if err := connection.proxyRequest(w, r); err != nil {
// An error occurred throw the connection away
log.Println(err)
connection.Close()
// Try to return an error to the client
// This might fail if response headers have already been sent
wsp.ProxyError(w, err)
}
}
In brief, the following process is performed.
- Take a WebSocket connection available from pools for relaying received requests
- Send the request to the peer through the WebSocket connection
I'll explain the one by one.
Take a WebSocket connection available from pools for relaying received requests
As a pre-requisite, we will start with WebSocket connection already established with the WebSocket client (wsp_client
) and held by the server as a pool (Chapter 2).
func (s *Server) Register(w http.ResponseWriter, r *http.Request) {
// - 1. Upgrade a received HTTP request to a WebSocket connection
// (omit)
// - 2. Wait a greeting message from the peer and parse it
// (omit)
// 3. Register the connection into server pools.
// (omit)
// Add the WebSocket connection to the pool
pool.Register(ws)
}
There are several possible designs for retrieving the connection from the pools, and I'll explain the pattern of using multiple threads. Specifically, there are two threads running: "Server", which accepts http requests, and "Dispatcher", which dispatches connections from the pools to be used to relay. Here is the Go code to start "Server" and "Dispatcher" that will be called from the main function.
func (s *Server) Start() {
// (omit)...
// start the "Dispatcher"
go s.dispatchConnections()
// start the "Server"
s.server = &http.Server{
Addr: s.Config.GetAddr(),
Handler: r,
}
go func() { log.Fatal(s.server.ListenAndServe()) }()
}
The go statement starts the execution of a function call as an independent concurrent thread of control, or goroutine, within the same the address space.
Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.
type Server struct {
// (omit)
// Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.
// "server" thread sends the value to this channel when accepting requests in the endpoint /requests,
// and "dispatcher" thread reads this channel.
dispatcher chan *ConnectionRequest
}
func (s *Server) dispatchConnections() {
for {
// The operator <- is "receive operator", which expression blocks until a value is available.
request, ok := <-s.dispatcher
if !ok {
// The value of `ok` is false if it is a zero value generated because the channel is closed an empty.
// In this case, that means server shutdowns. break
}
for {
// (omit)...
// Verify that we can use this connection
if connection.Take() {
request.connection <- connection
break
}
}
// (omit)...
}
}
The type of the field dispatcher
in the Server
structure is channel. Channel types provide a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type.
for {
request, ok := <-s.dispatcher
if !ok {
break
}
}
The operator x, ok := <-ch
is receiver operator. The value of the receive operation <-s.dispatcher
is the value received from the channel s.dispatcher
. Also, the expression <-
blocks until a value is available, so waits until a connection request is sent by "Server" thread.
The value of ok
is false if it is a zero value generated because the channel is closed or empty. In this case, that means server shutdowns.
On the other hand, the /request/
handler on the "Server" tread sends a value to this channel.
func (s *Server) request(w http.ResponseWriter, r *http.Request) {
// (omit): [1]: Receive requests to be proxied
// Here! Sends a value to dispatcher channel
request := NewConnectionRequest(s.Config.GetTimeout())
s.dispatcher <- request
connection := <-request.connection
if connection == nil {
wsp.ProxyErrorf(w, "Unable to get a proxy connection")
return
}
}
The operator ch <- x
is send statements, which sends a value on a channel. Here it sends the request to the dispatcher
channel.
By the way, the type of variable request
is ConnectionRequest.
type ConnectionRequest struct {
connection chan *Connection
}
After sending the value to the s.dispatcher
channel, it waits to be available to get the value in the request.connection
channel.
func (s *Server) request(w http.ResponseWriter, r *http.Request) {
// (omit)
s.dispatcher <- request
// Here!
connection := <-request.connection
if connection == nil {
wsp.ProxyErrorf(w, "Unable to get a proxy connection")
return
}
}
Next, let's look at the "Dispatcher" thread.
To summarize, "Server" thread sends the value to this channel when accepting requests in the endpoint /requests, and "dispatcher" thread reads this channel.
Let's move on the implementation of dispatchConnections function that starts a "dispatcher" thread.
type Server struct {
// (omit)
// Through dispatcher channel it communicates between "server" thread and "dispatcher" thread.
// "server" thread sends the value to this channel when accepting requests in the endpoint /requests,
// and "dispatcher" thread reads this channel.
dispatcher chan *ConnectionRequest
}
func (s *Server) dispatchConnections() {
for {
// Runs in an infinite loop and keeps receiving the value from the `server.dispatcher` channel
// The operator <- is "receive operator", which expression blocks until a value is available.
request, ok := <-s.dispatcher
if !ok {
// The value of `ok` is false if it is a zero value generated because the channel is closed an empty.
// In this case, that means server shutdowns.
break
}
// A timeout is set for each dispatch request.
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())
defer cancel()
L:
for {
select {
case <-ctx.Done(): // The timeout elapses
break L
default: // Go through
}
s.lock.RLock()
if len(s.pools) == 0 {
// No connection pool available
s.lock.RUnlock()
break
}
// [1]: Select a pool which has an idle connection
// Build a select statement dynamically to handle an arbitrary number of pools.
cases := make([]reflect.SelectCase, len(s.pools)+1)
for i, ch := range s.pools {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch.idle)}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectDefault}
s.lock.RUnlock()
_, value, ok := reflect.Select(cases)
if !ok {
continue // a pool has been removed, try again
}
connection, _ := value.Interface().(*Connection)
// [2]: Verify that we can use this connection and take it.
if connection.Take() {
request.connection <- connection
break
}
}
close(request.connection)
}
}
First, dispatchConnections runs in an infinite loop and keeps receiving the value from the server.dispatcher
channel.
for {
request, ok := <-s.dispatcher
// ...
}
The next step is to set the timeout, if no idle connection is obtained after a predetermined time, the channel will be closed.
// A timeout is set for each dispatch request.
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, s.Config.GetTimeout())
defer cancel()
L:
for {
select {
case <-ctx.Done(): // The timeout elapses
break L
default: // Go through
}
// ...
}
close(request.connection)
When the channel is closed, a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value after any previously sent values have been received (See The Go Programming Language Specification#Receive operator more detail). In this case, the "Server" thread waits to receive the value, and will get nil from request.connection channel, so the "Server" will return the error response.
func (s *Server) request(w http.ResponseWriter, r *http.Request) {
// (omit)
s.dispatcher <- request
// Here!
connection := <-request.connection
if connection == nil {
wsp.ProxyErrorf(w, "Unable to get a proxy connection")
return
}
}
Also, if you want to know more about timeout using the context package, please refer to the following post.
Then, select a pool which has an idle connection by building a select statement dynamically to handle an arbitrary number of pools.
cases := make([]reflect.SelectCase, len(s.pools)+1)
for i, ch := range s.pools {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch.idle)}
}
cases[len(cases)-1] = reflect.SelectCase{
Dir: reflect.SelectDefault}
s.lock.RUnlock()
_, value, ok := reflect.Select(cases)
if !ok {
continue // a pool has been removed, try again
}
connection, _ := value.Interface().(*Connection)
reflect.Select allows us to receive a variable number of channels. See the following post for more information.
Lastly, Verify that we can use this connection and take it.
if connection.Take() {
request.connection <- connection
break
}
connection.Take
verify the status of connection whether it's available one or not, then if it's available one, mark it busy.
That's it to take a WebSocket connection available from pools for relaying received requests.
Conclusion
Following part 3, I explained how to relay TCP connection from "App" to the peer of WebSocket. Especially, I focused on the way to take a WebSocket connection available from pools for relaying received requests.
In part 5, I'll explain how to send the request to the peer through the WebSocket connection.
Top comments (0)