In Hertz's service discovery, it can be configured to use load balancing to achieve service high availability and traffic balancing.
Hertz
Hertz is an ultra-large-scale enterprise-level microservice HTTP framework, featuring high ease of use, easy expansion, and low latency etc.
Hertz uses the self-developed high-performance network library Netpoll by default. In some special scenarios, Hertz has certain advantages in QPS and latency compared to go net.
In internal practice, some typical services, such as services with a high proportion of frameworks, gateways and other services, after migrating Hertz, compared to the Gin framework, the resource usage is significantly reduced, CPU usage is reduced by 30%-60% with the size of the traffic.
For more details, see cloudwego/hertz.
Weighted Random Algorithm
Load balancing in Hertz is implemented using a weighted random algorithm, defined in pkg/app/client/loadbalance/weight_random.go
. First we need to define the weightedBalancer
structure.
type weightedBalancer struct {
cachedWeightInfo sync.Map
sfg singleflight.Group
}
It contains a sync.Map type cache weight information and a singleflight.Group . We need to make weightedBalancer
implement the four methods defined in Loadbalancer
.
calcWeightInfo
This method is one of the core methods of this algorithm. We will use this method to return a weight information structure, which contains instances, their respective weights and total weights.
type weightInfo struct {
instances [] discovery.Instance
entries []int
weightSum int
}
First, we declare a variable w of type *weightInfo , initialize instances
to the incoming instance, initialize weightSum
to 0, and initialize entries
to an int type slice whose length is the number of instances.
w := &weightInfo{
instances: make([]discovery.Instance, len(e.Instances)),
weightSum: 0,
entries: make([]int, len(e.Instances)),
}
Next, the core part of this method, we use the subscript idx to traverse the incoming instance, when the weight of the instance is greater than 0, assign the subscripted instance to the instance w whose subscript is cnt, and the weight is the same, and then assign this weight Added to weightSum
. If less than 0, log warnings. Finally, assign the instances with subscripts 0 to cnt to w.instances
.
var cnt int
for idx := range e.Instances {
weight := e.Instances[idx].Weight()
if weight > 0 {
w.instances[cnt] = e.Instances[idx]
w.entries[cnt] = weight
w.weightSum += weight
cnt++
} else {
hlog.SystemLogger().Warnf("Invalid weight=%d on instance address=%s", weight, e.Instances[idx].Address())
}
}
w.instances = w.instances[:cnt]
return w
It is worth mentioning that why are there two subscripts recorded separately? Because when weight
is not greater than 0, there will be null values in the slice where w stores instances. The next four methods implement the methods defined in LoadBalancer
respectively.
Pick
We will first use the Load
method to check whether the incoming CacheKey exists, and if it does not exist, we will call the Do
method in singleflight.Group, Do
will execute and return the result of the given function, ensuring that only for Execute once for the given Key. If a duplicate key is entered, the duplicate call will wait for the original call to complete and receive the same result. According to the above analysis of calWeightInfo
, we will get a value of type *weightInfo and assign it to wi (at this time, wi is a variable of interface type).
wi, ok := wb.cachedWeightInfo.Load(e.CacheKey)
if !ok {
wi, _, _ = wb.sfg.Do(e.CacheKey, func() (interface{}, error) {
return wb.calcWeightInfo(e), nil
})
wb.cachedWeightInfo.Store(e.CacheKey, wi)
}
After that, we will perform the operation when the CacheKey exists, first convert wi to *weightInfo type through interface assertion and assign it to w . When the total weight value is less than 0, we will return empty, otherwise we will call the Intn
function in the fastrant
package to obtain a random weight, traverse the instance of w through a for loop, and subtract this weight from the randomly obtained weight each time The weight of the instance traversed at the time, and finally returns the instance traversed at this time when the random weight is less than 0.
w := wi.(*weightInfo)
if w.weightSum <= 0 {
return nil
}
weight := fastrand.Intn(w.weightSum)
for i := 0; i < len(w.instances); i++ {
weight -= w. entries[i]
if weight < 0 {
return w.instances[i]
}
}
Rebalance
The Rebalance
method will directly pass the CacheKey and the weightInfo obtained by the calcWeightInfo
method into the Store
method.
// Rebalance implements the Loadbalancer interface.
func (wb *weightedBalancer) Rebalance(e discovery. Result) {
wb.cachedWeightInfo.Store(e.CacheKey, wb.calcWeightInfo(e))
}
Delete
The Delete
method will call the Delete
method in the sync library to delete the incoming CacheKey directly.
// Delete implements the Loadbalancer interface.
func (wb *weightedBalancer) Delete(cacheKey string) {
wb.cachedWeightInfo.Delete(cacheKey)
}
Name
The Name
method will directly return the name of the algorithm which is "weight_random" .
func (wb *weightedBalancer) Name() string {
return "weight_random"
}
Round Robin Algorithm
In the loadbalance extension library, a load balancing implementation based on the polling algorithm is also provided. Similarly, the implementation of roundRobinBalancer
needs to be defined first.
type roundRobinBalancer struct {
cachedInfo sync.Map
sfg singleflight.Group
}
It contains a sync.Map type cache information and a singleflight.Group . We need to make roundRobinBalancer
implement the four methods defined in Loadbalancer
.
roundRobinInfo
Because the polling algorithm does not require weights, the structure is much simpler than the weighted random algorithm. The structure will contain the service instance and the index of the service for polling implementation.
type roundRobinInfo struct {
instances [] discovery.Instance
indexuint32
}
Pick
Similar to the weighted random algorithm, we also have a similar logic for judging whether the CacheKey exists. The difference from the weighted random algorithm is that the index will be set to 0 during initialization.
ri, ok := rr.cachedInfo.Load(e.CacheKey)
if !ok {
ri, _, _ = rr.sfg.Do(e.CacheKey, func() (interface{}, error) {
return &roundRobinInfo{
instances: e. Instances,
index: 0,
}, nil
})
rr.cachedInfo.Store(e.CacheKey, ri)
}
The next step is the main logic of the polling algorithm. First, if no instance is passed in, return empty, then add 1 to the previous index value, use atomic locks to prevent concurrent data competition, and finally calculate the latest index reached by polling.
r := ri.(*roundRobinInfo)
if len(r.instances) == 0 {
return nil
}
newIdx := atomic.AddUint32(&r.index, 1)
return r.instances[(newIdx-1)%uint32(len(r.instances))]
Rebalance
The Rebalance
method will directly pass the CacheKey and the initialized roundRobinInfo into the Store
method.
// Rebalance implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Rebalance(e discovery. Result) {
rr.cachedInfo.Store(e.CacheKey, &roundRobinInfo{
instances: e. Instances,
index: 0,
})
}
Delete
The Delete
method will call the Delete
method in the sync library to delete the incoming CacheKey directly.
// Delete implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Delete(cacheKey string) {
rr.cachedInfo.Delete(cacheKey)
}
Name
The Name
method will directly return the name of the algorithm which is "round_robin" .
// Name implements the Loadbalancer interface.
func (rr *roundRobinBalancer) Name() string {
return "round_robin"
}
How to use
Server
package main
import (
"context"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/utils"
"github.com/cloudwego/hertz/pkg/protocol/consts"
"github.com/hertz-contrib/registry/etcd"
)
func main() {
r, err := etcd.NewEtcdRegistry([]string{"127.0.0.1:2379"})
if err != nil {
panic(err)
}
addr := "127.0.0.1:8888"
h := server.Default(
server.WithHostPorts(addr),
server.WithRegistry(r, ®istry.Info{
ServiceName: "hertz.test.demo",
Addr: utils.NewNetAddr("tcp", addr),
Weight: 10,
Tags: nil,
}))
h.GET("/ping", func(_ context.Context, ctx *app.RequestContext) {
ctx.JSON(consts.StatusOK, utils.H{"ping": "pong2"})
})
h.Spin()
}
Using Nacos as our service discovery center, call the registry extension to build the simplest Hertz Server. Open three at the same time and run on different ports to simulate different servers.
Client
package main
import (
"context"
"github.com/cloudwego/hertz/pkg/app/client"
"github.com/cloudwego/hertz/pkg/app/middlewares/client/sd"
"github.com/cloudwego/hertz/pkg/common/config"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/hertz-contrib/registry/etcd"
)
func main() {
cli, err := client.NewClient()
if err != nil {
panic(err)
}
r, err := etcd.NewEtcdResolver([]string{"127.0.0.1:2379"})
if err != nil {
panic(err)
}
cli.Use(sd.Discovery(r))
for i := 0; i < 10; i++ {
status, body, err := cli.Get(context.Background(), nil, "http://hertz.test.demo/ping", config.WithSD(true))
if err != nil {
hlog.Fatal(err)
}
hlog.Infof("HERTZ: code=%d,body=%s", status, string(body))
}
}
The weighted random algorithm is used by default, and the polling algorithm is configured here for service discovery.
Test
2022/12/22 10:21:34.117821 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.118488 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.119135 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.119339 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.119518 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.119721 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.119929 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
2022/12/22 10:21:34.120109 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8001"}
2022/12/22 10:21:34.120243 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8002"}
2022/12/22 10:21:34.120384 main.go:50: [Info] code=200,body={"addr":"127.0.0.1:8003"}
It can be seen from the test results that the address to be accessed is selected by polling, and the test is successful.
Top comments (0)