Foreword
In the previous article, the implementation of service registration in Hertz has been interpreted. In this article, we will focus on the interpretation of Hertz's service discovery part.
Hertz
Hertz is an ultra-large-scale enterprise-level microservice HTTP framework, featuring high ease of use, easy expansion, and low latency etc.
Hertz uses the self-developed high-performance network library Netpoll by default. In some special scenarios, Hertz has certain advantages in QPS and latency compared to go net.
In internal practice, some typical services, such as services with a high proportion of frameworks, gateways and other services, after migrating Hertz, compared to the Gin framework, the resource usage is significantly reduced, CPU usage is reduced by 30%-60% with the size of the traffic.
For more details, see cloudwego/hertz.
Service discovery extension
Hertz supports custom discovery modules. Users can extend and integrate other registries by themselves. The extension is defined under pkg/app/client/discovery
.
Expansion interface
Service discovery interface definition and implementation
There are three methods in the service discovery interface.
-
Resolve
as the core method of Resolve, it will get the service discovery result we need from the target key. -
Target
resolves the unique target thatResolve
needs to use from the peer TargetInfo provided by Hertz, and this target will be used as the unique key of the cache. -
Name
is used to specify the unique name of the Resolver, and Hertz will use it to cache and reuse the Resolver.
type Resolver interface {
// Target should return a description for the given target that is suitable for being a key for cache.
Target(ctx context.Context, target *TargetInfo) string
// Resolve returns a list of instances for the given description of a target.
Resolve(ctx context.Context, desc string) (Result, error)
// Name returns the name of the resolver.
Name() string
}
These three methods are implemented in subsequent code in discovery.go
.
// SynthesizedResolver synthesizes a Resolver using a resolve function.
type SynthesizedResolver struct {
TargetFunc func(ctx context.Context, target *TargetInfo) string
ResolveFunc func(ctx context.Context, key string) (Result, error)
NameFunc func() string
}
func (sr SynthesizedResolver) Target(ctx context.Context, target *TargetInfo) string {
if sr.TargetFunc == nil {
return ""
}
return sr.TargetFunc(ctx, target)
}
func (sr SynthesizedResolver) Resolve(ctx context.Context, key string) (Result, error) {
return sr.ResolveFunc(ctx, key)
}
// Name implements the Resolver interface
func (sr SynthesizedResolver) Name() string {
if sr.NameFunc == nil {
return ""
}
return sr.NameFunc()
}
There are three resolution functions in SynthesizedResolver here for each of the three implementations to resolve.
TargetInfo definition
As mentioned above, the Target
method resolves the only target that Resolve
needs to use from TargetInfo.
type TargetInfo struct {
Host string
Tags map[string]string
}
instance interface definition and implementation
Instance contains information from the target service instance. There are three methods.
-
Address
is the address of the target service. -
Weight
serves the weight of the target. -
Tag
is the tag for the target service, in the form of key-value pairs.
// Instance contains information of an instance from the target service.
type Instance interface {
Address() net.Addr
Weight() int
Tag(key string) (value string, exist bool)
}
These three methods are implemented in subsequent code in discovery.go
.
type instance struct {
addr net.Addr
weight int
tags map[string]string
}
func (i *instance) Address() net.Addr {
return i.addr
}
func (i *instance) Weight() int {
if i.weight > 0 {
return i.weight
}
return registry.DefaultWeight
}
func (i *instance) Tag(key string) (value string, exist bool) {
value, exist = i.tags[key]
return
}
NewInstance
NewInstance
creates an instance with the given network, address and tags.
// NewInstance creates an Instance using the given network, address and tags
func NewInstance(network, address string, weight int, tags map[string]string) Instance {
return &instance{
addr: utils.NewNetAddr(network, address),
weight: weight,
tags: tags,
}
}
Result
As mentioned above, the Resolve
method will get the service discovery result we need from the target key. Result contains the results from service discovery. The instance list is cached and can be mapped to the cache using the CacheKey.
// Result contains the result of service discovery process.
// the instance list can/should be cached and CacheKey can be used to map the instance list in cache.
type Result struct {
CacheKey string
Instances []Instance
}
client middleware
Client middleware is defined under pkg/app/client/middlewares/client
.
Discovery
Discovery
will use the BalancerFactory
to construct a middleware. First read and apply the configuration we passed in through the Apply
method. The detailed configuration information is defined under pkg/app/client/middlewares/client/sd/options.go
. Then assign the service discovery center, load balancer and load balancing configuration we set to lbConfig
, call NewBalancerFactory
to pass in lbConfig
, and finally return an anonymous function of type client.Middleware.
// Discovery will construct a middleware with BalancerFactory.
func Discovery(resolver discovery.Resolver, opts ...ServiceDiscoveryOption) client.Middleware {
options := &ServiceDiscoveryOptions{
Balancer: loadbalance.NewWeightedBalancer(),
LbOpts: loadbalance.DefaultLbOpts,
Resolver: resolver,
}
options.Apply(opts)
lbConfig := loadbalance.Config{
Resolver: options.Resolver,
Balancer: options.Balancer,
LbOpts: options.LbOpts,
}
f := loadbalance.NewBalancerFactory(lbConfig)
return func(next client.Endpoint) client.Endpoint {
// ...
}
}
Implementation principle
The implementation principle of service discovery middleware is actually the last part of Discovery
that we did not parse above. We will reset the Host in the middleware. When the configuration in the request is not empty and IsSD()
is configured as True, we get an instance and call SetHost
to reset the Host.
return func(ctx context.Context, req *protocol.Request, resp *protocol.Response) (err error) {
if req.Options() != nil && req.Options().IsSD() {
ins, err := f.GetInstance(ctx, req)
if err != nil {
return err
}
req.SetHost(ins.Address().String())
}
return next(ctx, req, resp)
}
Implementation analysis of service discovery
Regular refresh
In practice, our service discovery information is updated frequently. Hertz uses the refresh
method to periodically refresh our service discovery information. We will refresh through a for range loop, where the interval between the loops is the RefreshInterval
in the configuration. Then we refresh by traversing the key-value pairs in the cache through the Range
method in the sync
library function.
// refresh is used to update service discovery information periodically.
func (b *BalancerFactory) refresh() {
for range time.Tick(b.opts.RefreshInterval) {
b.cache.Range(func(key, value interface{}) bool {
res, err := b.resolver.Resolve(context.Background(), key.(string))
if err != nil {
hlog.SystemLogger().Warnf("resolver refresh failed, key=%s error=%s", key, err.Error())
return true
}
renameResultCacheKey(&res, b.resolver.Name())
cache := value.(*cacheResult)
cache.res.Store(res)
atomic.StoreInt32(&cache.expire, 0)
b.balancer.Rebalance(res)
return true
})
}
}
resolver cache
In the comments of NewBalancerFactory
, we can know that when we get the same key as the target in the cache, we will get and reuse this load balancer from the cache. Let's briefly analyze its implementation. We pass the service discovery center, load balancer and load balancing configuration together into the cacheKey
function to get the uniqueKey .
func cacheKey(resolver, balancer string, opts Options) string {
return fmt.Sprintf("%s|%s|{%s %s}", resolver, balancer, opts.RefreshInterval, opts.ExpireInterval)
}
Then we will use the Load
method to find out whether there is the same uniqueKey in the map, if so, we will directly return to the load balancer. If not, we will add it to the cache.
func NewBalancerFactory(config Config) *BalancerFactory {
config.LbOpts.Check()
uniqueKey := cacheKey(config.Resolver.Name(), config.Balancer.Name(), config.LbOpts)
val, ok := balancerFactories.Load(uniqueKey)
if ok {
return val.(*BalancerFactory)
}
val, _, _ = balancerFactoriesSfg.Do(uniqueKey, func() (interface{}, error) {
b := &BalancerFactory{
opts: config.LbOpts,
resolver: config.Resolver,
balancer: config.Balancer,
}
go b.watcher()
go b.refresh()
balancerFactories.Store(uniqueKey, b)
return b, nil
})
return val.(*BalancerFactory)
}
There will be a problem if there is no cache for reuse. When the middleware initializes and executes two coroutines, if the user creates a new client every time, it will cause the coroutine to leak.
Summarize
In this article, we learned about the interface definition of Hertz service discovery, the design of client middleware, and the reason and implementation of using timed refresh and cache in service discovery implementation.
Finally, if the article is helpful to you, please like and share it, this is the greatest encouragement to me!
Top comments (0)