DEV Community

Aceld
Aceld

Posted on • Updated on

8.Zinx Message Queue and Task Worker Pool Design and Implementation

[Zinx]

<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>

[Zinx Application - MMO Game Case Study]

<10. Application Case Study using the Zinx Framework>
<11. MMO Online Game AOI Algorithm>
<12.Data Transmission Protocol: Protocol Buffers>
<13. MMO Game Server Application Protocol>
<14. Building the Project and User Login>
<15. World Chat System Implementation>
<16. Online Location Information Synchronization>
<17. Moving position and non-crossing grid AOI broadcasting>
<18. Player Logout>
<19. Movement and AOI Broadcast Across Grids>


source code

https://github.com/aceld/zinx/blob/master/examples/zinx_release/zinx-v0.8.tar.gz


In this chapter, we will add a message queue and a multi-task worker mechanism to Zinx. We will use the number of workers to limit the fixed number of Goroutines that handle business tasks, instead of creating an unlimited number of Goroutines. Although Go's scheduling algorithm is well-designed and optimized for performance, a large number of Goroutines can still incur unnecessary context switching costs. These costs should be minimized for a server. Therefore, we can use a message queue to buffer the data processed by the workers.

The design structure of the Worker Pool in Zinx is shown in Figure 8.1.

Image description

Figure 8.1: Worker Pool Design in Zinx

In the previous chapter of Zinx V0.7 Shooter, we separated the read and write business of the connection into two Goroutines: Reader and Writer. After adding the Worker Pool, the Reader will receive the Request requests from the Client and pass the requests to the TaskQueue in the Worker Pool. The Reader acts as the message producer, and each message task queue will have a Worker to consume it. After consumption, the data will be passed to the Writer for writing to the remote client.

The purpose of routing messages through the Worker Pool is to limit the number of Goroutines responsible for processing message business logic, regardless of the number of connections. This fixed number of Goroutines, which perform the parallel processing of the message business logic, helps prevent performance degradation caused by excessive Goroutine proliferation in scenarios with high read/write throughput and concurrency.

8.1 Creating a Message Queue

Firstly, we will integrate the message queue part into the MsgHandler module, as it belongs to the scope of the message module. The message queue will be a member attribute called TaskQueue in the MsgHandle message management module. Each message will have a data type of IRequest, and it can be defined as follows:

// zinx/znet/msghandler.go

type MsgHandle struct {
    Apis            map[uint32]ziface.IRouter
    WorkerPoolSize  uint32                 // Number of workers in the business worker pool
    TaskQueue       []chan ziface.IRequest // Message queues that workers are responsible for picking tasks from
}
Enter fullscreen mode Exit fullscreen mode

WorkerPoolSize represents the number of message queues, and there is a 1:1 relationship between the message queues and the workers. Each Worker will consume only one message queue. TaskQueue is an array of channels, and it is a good practice to initialize the Slice with the make function during initialization. The initialization code is as follows:

func NewMsgHandle() *MsgHandle {
    return &MsgHandle{
        Apis: make(map[uint32]ziface.IRouter),
        WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
        TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
    }
}
Enter fullscreen mode Exit fullscreen mode

Two members are added:

  • WorkerPoolSize: It represents the number of workers in the worker pool. Since each queue in the TaskQueue should correspond to a Worker, the number of queues created in TaskQueue should be consistent with the number of Workers.

  • TaskQueue: It is a collection of channels for Request request messages. It is used to buffer Request request information that workers can call. The Worker will retrieve and process client request data from the corresponding queue.

Ideally, WorkerPoolSize should also be obtained from GlobalObject, and it can be manually configured in the zinx.json configuration file. The code for loading the configuration is modified as follows:

// zinx/utils/globalobj.go

/*
    Stores all the global parameters of the Zinx framework for other modules to use
    Some parameters can also be configured by the user through the zinx.json file
*/
type GlobalObj struct {
    /*
        Server
    */
    TcpServer        ziface.IServer // Current global Server object of Zinx
    Host             string        // IP of the current server host
    TcpPort          int           // Listening port number of the current server host
    Name             string        // Name of the current server

    /*
        Zinx
    */
    Version          string // Current Zinx version number
    MaxPacketSize    uint32 // Maximum size of data packet
    MaxConn          int    // Maximum number of connections allowed on the current server host
    WorkerPoolSize   uint32 // Number of workers in the business worker pool
    MaxWorkerTaskLen uint32 // Maximum number of tasks stored in the task queue corresponding to each business worker

    /*
        config file path
    */
    ConfFilePath string
}

// Provide init() method, automatically loaded
func init() {
    // Initialize the GlobalObject variable and set some default values
    GlobalObject = &GlobalObj{
        Name:             "ZinxServerApp",
        Version:          "V0.4",
        TcpPort:          7777,
        Host:             "0.0.0.0",
        MaxConn:          12000,
        MaxPacketSize:    4096,
        ConfFilePath:     "conf/zinx.json",
        WorkerPoolSize:   10,
        MaxWorkerTaskLen: 1024,
    }

    // Load some user-configured parameters from the configuration file
    GlobalObject.Reload()
}
Enter fullscreen mode Exit fullscreen mode

8.2 Creating and Starting the Worker Pool

In this chapter, we will add the Worker pool to the Zinx V0.7 Shooter project. We will start by defining some interfaces for starting the Worker pool. These interfaces will be defined in the abstraction layer IMsgHandle. The interface definitions are as follows:

// zinx/ziface/imsghandler.go

/*
    Message Management Abstract Layer
*/
type IMsgHandle interface {
    DoMsgHandler(request IRequest)                          // Non-blocking message handling
    AddRouter(msgId uint32, router IRouter)                 // Add specific handling logic for a message
    StartWorkerPool()                                       // Start the worker pool
    SendMsgToTaskQueue(request IRequest)                     // Send the message to the TaskQueue to be processed by the worker
}
Enter fullscreen mode Exit fullscreen mode

The two new interface methods have the following meanings:

  • StartWorkerPool(): This method starts the Worker pool. It starts the pool based on the configured WorkerPoolSize and assigns a TaskQueue to each Worker. Each Worker's work is carried out by a Goroutine.

  • SendMsgToTaskQueue(request IRequest): This method provides the interface for sending messages to the TaskQueue, which means handing the message to the Worker pool. This serves as the entry point for the Worker pool.

Let's start by implementing the StartOneWorker() method. In the implementation file msghandler.go, add the StartOneWorker() method to MsgHandle. This method represents the work of a single Worker. Each Worker will continuously consume messages from its corresponding TaskQueue. When a message arrives, the request will be passed to the router for business matching and processing. The code logic is as follows:

// zinx/znet/msghandler.go

// Start a Worker workflow
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
    fmt.Println("Worker ID =", workerID, "is started.")
    // Continuously wait for messages in the queue
    for {
        select {
        // If there is a message, take the Request from the queue and execute the bound business method
        case request := <-taskQueue:
            mh.DoMsgHandler(request)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The business logic of each Worker is to continuously consume messages from their respective TaskQueue. Once a message arrives, the corresponding Request will be retrieved from the queue and processed by the router. The Worker will then continue to block and wait for the next message in the queue.

The next step is to start all the Workers, associate each Worker with its corresponding TaskQueue, and ensure that they are working together. This is the purpose of the StartWorkerPool() method. The implementation logic is as follows:

// zinx/znet/msghandler.go

// Start the worker pool
func (mh *MsgHandle) StartWorkerPool() {
    // Start the required number of workers
    for i := 0; i < int(mh.WorkerPoolSize); i++ {
        // A worker is started
        // Allocate space for the current worker's task queue
        mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
        // Start the current worker, blocking and waiting for messages in the corresponding task queue
        go mh.StartOneWorker(i, mh.TaskQueue[i])
    }
}
Enter fullscreen mode Exit fullscreen mode

The StartWorkerPool() method starts the number of workers specified by WorkerPoolSize. Each worker is associated with its corresponding TaskQueue using the index i and the TaskQueue slice. A Goroutine is created to handle the work of each Worker. Each Worker will block and wait for messages to arrive in its assigned TaskQueue. It is important to note that each Channel needs to be initialized with memory space using the make function before use.

8.3 Send Message to the Worker Pool

Now that the Worker pool is ready, we need an entry point to send messages to the Worker pool. Next, we will implement the SendMsgToTaskQueue() method. The code is as follows:

// zinx/znet/msghandler.go

// Send the message to the TaskQueue to be processed by the worker
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
    // Assign the current connection to the worker responsible for processing this connection based on ConnID
    // Round-robin average allocation policy

    // Get the workerID responsible for processing this connection
    workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize

    fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)

    // Send the request message to the task queue
    mh.TaskQueue[workerID] <- request
}
Enter fullscreen mode Exit fullscreen mode

SendMsgToTaskQueue() serves as the entry point for the Worker pool. It uses a round-robin allocation mechanism to determine which Worker should handle the request based on the ConnID of the connection. Each connection's request is evenly distributed among the available Workers by matching the remainder of the ConnID with the WorkerID.

Finally, the Request request is sent to the corresponding Worker's TaskQueue, and the Goroutine of the respective Worker will handle the request for that connection.

8.4: Implementation of Zinx-V0.8

In this section, we will integrate the message queue and multi-tasking Worker mechanism into Zinx, upgrading it to version V0.8. First, in the Start() method of the Server, we will start the Worker pool before the logic reaches the Accept() method. The additional code is as follows:

// zinx/znet/server.go

// Start the network service
func (s *Server) Start() {

    // ... (omitted code)

    // Start a goroutine to handle the server listener business
    go func() {
        // 0 Start the Worker pool mechanism
        s.msgHandler.StartWorkerPool()

        // 1 Get a TCP Addr
        addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
        if err != nil {
            fmt.Println("resolve tcp addr err: ", err)
            return
        }

        // ... (omitted code)
    }()
}
Enter fullscreen mode Exit fullscreen mode

Next, when the Server has received a connection request from a client and data is available, we will send the data to the Worker pool for processing. Therefore, we should add the following logic to the StartReader() method of the Connection:

// zinx/znet/connection.go

/*
    Read message goroutine, used to read data from the client
*/
func (c *Connection) StartReader() {
    fmt.Println("Reader Goroutine is running")
    defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
    defer c.Stop()

    for {
        // ... (omitted code)

        // Get the current client's Request data
        req := Request{
            conn: c,
            msg:  msg,
        }

        if utils.GlobalObject.WorkerPoolSize > 0 {
            // Worker pool mechanism has been started, send the message to the Worker for processing
            c.MsgHandler.SendMsgToTaskQueue(&req)
        } else {
            // Execute the corresponding Handle method from the bound message and its corresponding processing method
            go c.MsgHandler.DoMsgHandler(&req)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The StartReader() method of the Connection does not force the use of the multi-tasking Worker mechanism. Instead, it checks if the user has configured a non-zero WorkerPoolSize. If it is greater than 0, the multi-tasking mechanism is started to handle the connection's request messages. If it is 0 or less, the previous logic remains unchanged, and only a temporary Goroutine is started to handle the client's request messages.

8.5: Implementing the Application with Zinx-V0.8

Now we will verify if the WorkerPool functionality can be used properly. The test code remains the same as in V0.6 and V0.7. Since the external interface of the Zinx framework has not changed, we need to start the Server application and Client application in separate terminals. The following code should be executed in four separate terminals:

$ go run Server.go
$ go run Client0.go
$ go run Client1.go
$ go run Client0.go
Enter fullscreen mode Exit fullscreen mode

The output of the server application is as follows:

$ go run Server.go
Add api msgId =  0
Add api msgId =  1
[START] Server name: zinx v-0.8 demoApp, listener at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
Worker ID =  4  is started.
start Zinx server   zinx v-0.8 demoApp  succ, now listening...
Worker ID =  9  is started.
Worker ID =  0  is started.
Worker ID =  5  is started.
Worker ID =  6  is started.
Worker ID =  1  is started.
Worker ID =  2  is started.
Worker ID =  7  is started.
Worker ID =  8  is started.
Worker ID =  3  is started.
Reader Goroutine is  running
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is  running
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is  running
Add ConnID= 2  request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 2  request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1  request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0  request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
//... (truncated output)
Enter fullscreen mode Exit fullscreen mode

The client-side 0 execution result is as follows:

$ go run Client0.go
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
//... (truncated output)
Enter fullscreen mode Exit fullscreen mode

The client-side 1 execution result is as follows:

$ go run Client1.go
Client Test ... start
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
//... (truncated output)
Enter fullscreen mode Exit fullscreen mode

The client-side 2 (same as the code for client-side 0) execution result is as follows:

$ go run Client0.go
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
//... (truncated output)
Enter fullscreen mode Exit fullscreen mode

8.6: Conclusion

The message queue and task worker pool are now functional based on the results of the above code. But what is the significance of designing the WorkerPool?

In the current design of the Zinx framework, when a client connection comes in, the server starts a reader Goroutine and a writer Goroutine. These two Goroutines handle the I/O read and write states and are in a blocking state when there is no data transmission on the network. As mentioned in the previous chapters, a blocked process does not consume CPU resources. When data is being transmitted on the network, the CPU is responsible for the I/O read and write calculations. This part of the cost cannot be optimized or should not be optimized by the developer. Therefore, only the business logic processing of the data read into the application program's memory in the user space remains.

If the framework does not add the WorkerPool mechanism, it will not have any impact on the overall functionality. However, if one Goroutine is assigned to handle one business processing per connection, as the number of connections increases, the number of Goroutines in the server will increase proportionally. These Goroutines will not encounter blocking and will primarily perform business calculations. This maximizes the priority of business processing. However, the sharp increase in the number of Goroutines will result in a high frequency of context switching. When it exceeds a certain threshold that the computer hardware can handle, the program will immediately become slow. If connections continue to increase at this point, it will cause a vicious cycle that will result in the program's resources being overwhelmed and the process dying. The WorkerPool effectively controls the maximum frequency of context switching for business processing in the server program. By funneling a large number of connections, the speed can be controlled.


source code

https://github.com/aceld/zinx/blob/master/examples/zinx_release/zinx-v0.8.tar.gz


[Zinx]

<1.Building Basic Services with Zinx Framework>
<2. Zinx-V0.2 Simple Connection Encapsulation and Binding with Business>
<3.Design and Implementation of the Zinx Framework's Routing Module>
<4.Zinx Global Configuration>
<5.Zinx Message Encapsulation Module Design and Implementation>
<6.Design and Implementation of Zinx Multi-Router Mode>
<7. Building Zinx's Read-Write Separation Model>
<8.Zinx Message Queue and Task Worker Pool Design and Implementation>
<9. Zinx Connection Management and Property Setting>

[Zinx Application - MMO Game Case Study]

<10. Application Case Study using the Zinx Framework>
<11. MMO Online Game AOI Algorithm>
<12.Data Transmission Protocol: Protocol Buffers>
<13. MMO Game Server Application Protocol>
<14. Building the Project and User Login>
<15. World Chat System Implementation>
<16. Online Location Information Synchronization>
<17. Moving position and non-crossing grid AOI broadcasting>
<18. Player Logout>
<19. Movement and AOI Broadcast Across Grids>


Author:
discord: https://discord.gg/xQ8Xxfyfcz
zinx: https://github.com/aceld/zinx
github: https://github.com/aceld
aceld's home: https://yuque.com/aceld

Top comments (0)