DEV Community

L2ncE
L2ncE

Posted on

Load balancing expansion from 0 to 1

In Hertz's service discovery, it can be configured to use load balancing to achieve service high availability and traffic balancing.

Hertz

Hertz is an ultra-large-scale enterprise-level microservice HTTP framework, featuring high ease of use, easy expansion, and low latency etc.

Hertz uses the self-developed high-performance network library Netpoll by default. In some special scenarios, Hertz has certain advantages in QPS and latency compared to go net.

In internal practice, some typical services, such as services with a high proportion of frameworks, gateways and other services, after migrating Hertz, compared to the Gin framework, the resource usage is significantly reduced, CPU usage is reduced by 30%-60% with the size of the traffic.

For more details, see cloudwego/hertz.

Weighted Random Algorithm

Load balancing in Hertz is implemented using a weighted random algorithm, defined in pkg/app/client/loadbalance/weight_random.go. First we need to define the weightedBalancer structure.

type weightedBalancer struct {
    cachedWeightInfo sync.Map
    sfg singleflight.Group
}
Enter fullscreen mode Exit fullscreen mode

It contains a sync.Map type cache weight information and a singleflight.Group . We need to make weightedBalancer implement the four methods defined in Loadbalancer.

calcWeightInfo

This method is one of the core methods of this algorithm. We will use this method to return a weight information structure, which contains instances, their respective weights and total weights.

type weightInfo struct {
    instances [] discovery.Instance
    entries []int
    weightSum int
}
Enter fullscreen mode Exit fullscreen mode

First, we declare a variable w of type *weightInfo , initialize instances to the incoming instance, initialize weightSum to 0, and initialize entries to an int type slice whose length is the number of instances.

w := &weightInfo{
   instances: make([]discovery.Instance, len(e.Instances)),
   weightSum: 0,
   entries: make([]int, len(e.Instances)),
}
Enter fullscreen mode Exit fullscreen mode

Next, the core part of this method, we use the subscript idx to traverse the incoming instance, when the weight of the instance is greater than 0, assign the subscripted instance to the instance w whose subscript is cnt, and the weight is the same, and then assign this weight Added to weightSum. If less than 0, log warnings. Finally, assign the instances with subscripts 0 to cnt to w.instances.

var cnt int

for idx := range e.Instances {
   weight := e.Instances[idx].Weight()
   if weight > 0 {
     w.instances[cnt] = e.Instances[idx]
     w.entries[cnt] = weight
     w.weightSum += weight
     cnt++
   } else {
     hlog.SystemLogger().Warnf("Invalid weight=%d on instance address=%s", weight, e.Instances[idx].Address())
   }
}

w.instances = w.instances[:cnt]

return w
Enter fullscreen mode Exit fullscreen mode

It is worth mentioning that why are there two subscripts recorded separately? Because when weight is not greater than 0, there will be null values in the slice where w stores instances. The next four methods implement the methods defined in LoadBalancer respectively.

Pick

We will first use the Load method to check whether the incoming CacheKey exists, and if it does not exist, we will call the Do method in singleflight.Group, Do will execute and return the result of the given function, ensuring that only for Execute once for the given Key. If a duplicate key is entered, the duplicate call will wait for the original call to complete and receive the same result. According to the above analysis of calWeightInfo, we will get a value of type *weightInfo and assign it to wi (at this time, wi is a variable of interface type).

wi, ok := wb.cachedWeightInfo.Load(e.CacheKey)
if !ok {
    wi, _, _ = wb.sfg.Do(e.CacheKey, func() (interface{}, error) {
        return wb.calcWeightInfo(e), nil
    })
    wb.cachedWeightInfo.Store(e.CacheKey, wi)
}
Enter fullscreen mode Exit fullscreen mode

After that, we will perform the operation when the CacheKey exists, first convert wi to *weightInfo type through interface assertion and assign it to w . When the total weight value is less than 0, we will return empty, otherwise we will call the Intn function in the fastrant package to obtain a random weight, traverse the instance of w through a for loop, and subtract this weight from the randomly obtained weight each time The weight of the instance traversed at the time, and finally returns the instance traversed at this time when the random weight is less than 0.

w := wi.(*weightInfo)
if w.weightSum <= 0 {
   return nil
}

weight := fastrand.Intn(w.weightSum)
for i := 0; i < len(w.instances); i++ {
   weight -= w. entries[i]
   if weight < 0 {
     return w.instances[i]
   }
}
Enter fullscreen mode Exit fullscreen mode

Rebalance

The Rebalance method will directly pass the CacheKey and the weightInfo obtained by the calcWeightInfo method into the Store method.

// Rebalance implements the Loadbalancer interface.
func (wb *weightedBalancer) Rebalance(e discovery. Result) {
    wb.cachedWeightInfo.Store(e.CacheKey, wb.calcWeightInfo(e))
}
Enter fullscreen mode Exit fullscreen mode

Delete

The Delete method will call the Delete method in the sync library to delete the incoming CacheKey directly.

// Delete implements the Loadbalancer interface.
func (wb *weightedBalancer) Delete(cacheKey string) {
    wb.cachedWeightInfo.Delete(cacheKey)
}
Enter fullscreen mode Exit fullscreen mode

Name

The Name method will directly return the name of the algorithm which is "weight_random" .

func (wb *weightedBalancer) Name() string {
    return "weight_random"
}
Enter fullscreen mode Exit fullscreen mode

Round Robin Algorithm

In the loadbalance extension library, a load balancing implementation based on the polling algorithm is also provided. Similarly, the implementation of roundRobinBalancer needs to be defined first.

type roundRobinBalancer struct {
    cachedInfo sync.Map
    sfg singleflight.Group
}
Enter fullscreen mode Exit fullscreen mode

It contains a sync.Map type cache information and a singleflight.Group . We need to make roundRobinBalancer implement the four methods defined in Loadbalancer.

roundRobinInfo

Because the polling algorithm does not require weights, the structure is much simpler than the weighted random algorithm. The structure will contain the service instance and the index of the service for polling implementation.

type roundRobinInfo struct {
    instances [] discovery.Instance
    indexuint32
}
Enter fullscreen mode Exit fullscreen mode

Pick

Similar to the weighted random algorithm, we also have a similar logic for judging whether the CacheKey exists. The difference from the weighted random algorithm is that the index will be set to 0 during initialization.

ri, ok := rr.cachedInfo.Load(e.CacheKey)
if !ok {
   ri, _, _ = rr.sfg.Do(e.CacheKey, func() (interface{}, error) {
     return &roundRobinInfo{
       instances: e. Instances,
       index: 0,
     }, nil
   })
   rr.cachedInfo.Store(e.CacheKey, ri)
}
Enter fullscreen mode Exit fullscreen mode

The next step is the main logic of the polling algorithm. First, if no instance is passed in, return empty, then add 1 to the previous index value, use atomic locks to prevent concurrent data competition, and finally calculate the latest index reached by polling.

r := ri.(*roundRobinInfo)
if len(r.instances) == 0 {
   return nil
}

newIdx := atomic.AddUint32(&r.index, 1)
return r.instances[(newIdx-1)%uint32(len(r.instances))]
Enter fullscreen mode Exit fullscreen mode

Rebalance

The Rebalance method will directly pass the CacheKey and the initialized roundRobinInfo into the Store method.

// Rebalance implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Rebalance(e discovery. Result) {
    rr.cachedInfo.Store(e.CacheKey, &roundRobinInfo{
        instances: e. Instances,
        index: 0,
    })
}
Enter fullscreen mode Exit fullscreen mode

Delete

The Delete method will call the Delete method in the sync library to delete the incoming CacheKey directly.

// Delete implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Delete(cacheKey string) {
    rr.cachedInfo.Delete(cacheKey)
}
Enter fullscreen mode Exit fullscreen mode

Name

The Name method will directly return the name of the algorithm which is "round_robin" .

// Name implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Name() string {
    return "round_robin"
}
Enter fullscreen mode Exit fullscreen mode

How to use

Server

package main

import (
    "context"

    "github.com/cloudwego/hertz/pkg/app"
    "github.com/cloudwego/hertz/pkg/app/server"
    "github.com/cloudwego/hertz/pkg/app/server/registry"
    "github.com/cloudwego/hertz/pkg/common/utils"
    "github.com/cloudwego/hertz/pkg/protocol/consts"
    "github.com/hertz-contrib/registry/etcd"
)

func main() {
    r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"})
    if err != nil {
        panic(err)
    }
    addr := "127.0.0.1:8888"
    h := server.Default(
        server.WithHostPorts(addr),
        server.WithRegistry(r, &registry.Info{
            ServiceName: "hertz.test.demo",
            Addr:        utils.NewNetAddr("tcp", addr),
            Weight:      10,
            Tags:        nil,
        }))
    h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
        ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"})
    })
    h.Spin()
}
Enter fullscreen mode Exit fullscreen mode

Using Nacos as our service discovery center, call the registry extension to build the simplest Hertz Server. Open three at the same time and run on different ports to simulate different servers.

Client

package main

import (
    "context"

    "github.com/cloudwego/hertz/pkg/app/client"
    "github.com/cloudwego/hertz/pkg/app/middlewares/client/sd"
    "github.com/cloudwego/hertz/pkg/common/config"
    "github.com/cloudwego/hertz/pkg/common/hlog"
    "github.com/hertz-contrib/registry/etcd"
)

func main() {
    cli, err := client.NewClient()
    if err != nil {
        panic(err)
    }
    r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
    if err != nil {
        panic(err)
    }
    cli.Use(sd.Discovery(r))
    for i := 0; i < 10; i++ {
        status, body, err := cli.Get(context.Background(), nil, "http://hertz.test.demo/ping", config.WithSD(true))
        if err != nil {
            hlog.Fatal(err)
        }
        hlog.Infof("HERTZ: code=%d,body=%s", status, string(body))
    }
}
Enter fullscreen mode Exit fullscreen mode

The weighted random algorithm is used by default, and the polling algorithm is configured here for service discovery.

Test

2022/12/22 10:21:34.117821 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.118488 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.119135 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.119339 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.119518 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.119721 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.119929 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.120109 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.120243 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.120384 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
Enter fullscreen mode Exit fullscreen mode

It can be seen from the test results that the address to be accessed is selected by polling, and the test is successful.

Reference

cloudwego/hertz

hertz-contrib/loadbalance

hertz-contrib/registry

Top comments (0)