DEV Community

Cover image for A load shedding protector against bursting requests
Kevin Wan
Kevin Wan

Posted on

A load shedding protector against bursting requests

Why do we need load shedding?

In a microservice cluster, the invocation chain is complex, and as a service provider, it needs a mechanism to protect itself from being overwhelmed by mindless invocations and to ensure the high availability of its own services.

The most common protection mechanism is flow limiting mechanism, the premise of using flow limiter is to know the maximum number of concurrency that it can handle, usually through pressure testing to get the maximum number of concurrency before going online, and the flow limiting parameters of each interface are different in the daily request process, while the system has been constantly iterating its processing capacity will often change, each time before going online, it is necessary to pressure test and then adjust the flow limiting The parameters become very tedious.

So is there a more concise flow limiting mechanism that can achieve maximum self-protection?

What is adaptive load shedding

Adaptive load shedding is a very intelligent way to protect the service itself, and dynamically determine whether it needs to be downgraded based on the system load of the service itself.

Design goals.

  1. to ensure that the system is not dragged down.
  2. maintain the system throughput under the premise of system stability.

The key then is how to measure the load of the service itself?

Judging high load depends on two main indicators.

  1. whether the cpu is overloaded.
  2. whether the maximum number of concurrency is overloaded.

When the above two points are satisfied at the same time, it means that the service is in a high load state, and adaptive load reduction is performed.

It should also be noted that the cpu load and concurrency of the high concurrency scenario tend to fluctuate greatly, and we call this phenomenon burr from the data, which may lead to frequent auto-dowload operations, so we generally obtain the average value of the indicators over a period of time to make the indicators more smooth. The implementation can be done by accurately recording the metrics over a period of time and then directly calculating the average value, but it takes up some system resources.

There is a statistical algorithm: exponential moving average, which can be used to estimate the local average of variables, so that the update of variables is related to the historical values over time, and the average can be estimated without recording all the historical local variables, which saves valuable server resources.

Principle of Sliding Average Algorithm Refer to this article for a very clear explanation.

The variable V is noted as Vt at time t, and θt is the value of the variable V taken at time t. That is, Vt=θt when the sliding average model is not used, and after using the sliding average model, Vt is updated by the following formula.

Vt=β⋅Vt-1+(1-β)⋅θt

  • Vt = θt for β = 0
  • β = 0.9, which is approximately the average of the last 10 θt values
  • β = 0.99, which is approximately the average of the last 100 θt values

Code Implementation

Next, let's look at the code implementation of go-zero adaptive load reduction.

core/load/adaptiveshedder.go

Adaptive downgrade interface definition.

// callback function
Promise interface {
    // This function is called back when the request succeeds
    Pass()
    // Callback when the request fails
    Fail()
}

// Drop the interface definition
Shedder interface {
    // Drop check
    // 1. Allow the call, you need to manually execute Promise.accept()/reject() to report the actual execution task structure
    // 2. Reject the call and it will return err: Service overloaded error ErrServiceOverloaded
    Allow() (Promise, error)
}
Enter fullscreen mode Exit fullscreen mode


`

The interface definition is very concise meaning it is actually very simple to use, exposing an `Allow()(Promise, error) to the outside world.

Example of go-zero usage.

The business only needs to call the method to determine whether to dowload, if it is dowloaded then directly end the process, otherwise the implementation of the business finally use the return value Promise according to the results of the implementation of the callback results can be.

func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor {
    ensureSheddingStat()

    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler) (val interface{}, err error) {
        sheddingStat.IncrementTotal()
        var promise load.Promise
        // Check if it was downgraded
        promise, err = shedder.Allow()
        // drop the load and record the relevant logs and metrics
        if err ! = nil {
            metrics.AddDrop()
            sheddingStat.IncrementDrop()
            return
        }
        // Final callback execution result
        defer func() {
            // Failed to execute
            if err == context.DeadlineExceeded {
                promise.Fail()
            // Execution succeeded
            } else {
                sheddingStat.IncrementPass()
                Promise.Pass()
            }
        }()
        // Execute the business method
        return handler(ctx, req)
    }
}
Enter fullscreen mode Exit fullscreen mode

Interface implementation class definition.

There are three main types of properties

  1. cpu load threshold: exceeding this value means the cpu is in a high load state. 2.
  2. cooling period: if the service has been down loaded before, then it will enter the cooling period, the purpose is to prevent the load from being down during the process of down loading and immediately pressurized resulting in back and forth jitter. Because it takes some time to reduce the load, you should continue to check whether the number of concurrency exceeds the limit during the cooling-off period, and continue to discard requests if the limit is exceeded.
  3. concurrency: the number of concurrency currently being processed, the average number of concurrency currently being processed, and the number of requests and response time in the most recent period, in order to calculate whether the number of concurrency currently being processed is greater than the maximum number of concurrency that can be carried by the system.
// option parameter pattern
ShedderOption func(opts *shedderOptions)

// Optional configuration parameters
shedderOptions struct {
    // sliding time window size
    window time.Duration
    // Number of sliding time windows
    buckets int
    // cpu load threshold
    cpuThreshold int64
cpuThreshold int64 }

// adaptiveShedder struct, need to implement Shedder interface
adaptiveShedder struct {
    // cpu load threshold
    // higher than the threshold means high load needs to be down loaded to ensure service
    cpuThreshold int64
    // How many buckets in 1s
    windows int64
    // number of concurrent
    flying int64
    // Number of concurrent sliding smoothing
    avgFlying float64
    // Spin lock, one service shares one drop
    // Lock must be applied when counting the number of requests currently being processed
    // lossless concurrency, improves performance
    avgFlyingLock syncx.SpinLock
    // Last rejection time
    dropTime *syncx.AtomicDuration
    // whether it has been rejected recently
    droppedRecently *syncx.AtomicBool
    // request count statistics, with a sliding time window to record metrics for the most recent period
    passCounter *collection.RollingWindow
    // response time statistics, with a sliding time window to record metrics over the most recent period
    rtCounter *collection.RollingWindow
}
Enter fullscreen mode Exit fullscreen mode

Adaptive dropload constructor.

func NewAdaptiveShedder(opts . .ShedderOption) Shedder {
    // To ensure code uniformity
    // return the default empty implementation when the developer closes, to achieve code uniformity
    // go-zero uses this design in many places, such as Breaker, the logging component
    if !enabled.True() {
        return newNopShedder()
    }
    // options mode sets optional configuration parameters
    options := shedderOptions{
        // Default statistics for the last 5s
        window: defaultWindow,
        // default bucket count of 50
        buckets: defaultBuckets,
        // cpu load
        cpuThreshold: defaultCpuThreshold,
    }
    for _, opt := range opts {
        opt(&options)
    }
    // Calculate the interval for each window, default is 100ms
    bucketDuration := options.window / time.Duration(options.buckets)
    return &adaptiveShedder{
        // cpu load
        cpuThreshold: options.cpuThreshold,
        // How many sliding window units are contained in 1s of time
        windows: int64(time.Second / bucketDuration),
        // last rejection time
        dropTime: syncx.NewAtomicDuration(),
        // whether it has been rejected recently
        droppedRecently: syncx.NewAtomicBool(),
        // qps statistics, sliding time window
        // ignore the current writing window (bucket), incomplete time period may lead to data exceptions
        NewRollingWindow(options.buckets, bucketDuration.), // passCounter: collection,
            IgnoreCurrentBucket()),
        // Response time statistics, sliding time window
        // Ignore the current writing window (bucket), incomplete time period may lead to data exceptions
        rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration,
            collection.IgnoreCurrentBucket()),
    }
}
Enter fullscreen mode Exit fullscreen mode

Drop check Allow().

Check if the current request should be dropped, and if it is dropped the business side needs to interrupt the request directly to protect the service, which also means that the downgrade takes effect while entering the cooling off period. If released then return promise and wait for the business side to execute the callback function to perform the metrics statistics.

// Drop check
func (as *adaptiveShedder) Allow() (Promise, error) {
    // Check if the request was dropped
    if as.shouldDrop() {
        // Set the drop time
        as.dropTime.Set(timex.Now())
        // Recently dropped
        as.droppedRecently.Set(true)
        // return overloaded
        return nil, ErrServiceOverloaded
    }
    // Add 1 to the number of requests being processed
    as.addFlying(1)
    // Each allowed request here returns a new promise object
    // promise holds the drop pointer object internally
    return &promise{
        start: timex.Now(),
        shedder: as,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

Check if shouldDrop().

// whether the request should be dropped or not
func (as *adaptiveShedder) shouldDrop() bool {
    // The current cpu load exceeds the threshold
    // The service should continue to check the load and try to drop the request while it is on cooldown
    if as.systemOverloaded() || as.stillHot() {
        // Check if the concurrency being processed exceeds the current maximum number of concurrency that can be hosted
        // Discard the request if it exceeds it
        if as.highThru() {
            flying := atomic.LoadInt64(&as.flying)
            as.avgFlyingLock.Lock()
            avgFlying := as.avgFlying
            as.avgFlyingLock.Unlock()
            msg := fmt.Sprintf(
                "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
                stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
            logx.Error(msg)
            stat.Report(msg)
            return true
        }
    }
    return false
}
Enter fullscreen mode Exit fullscreen mode

cpu threshold check systemOverloaded().

The cpu load value calculation algorithm uses a sliding average algorithm to prevent burrs. Sampling every 250ms β is 0.95, which is roughly equivalent to the average of 20 cpu loadings in history, with a time period of about 5s.

// whether the cpu is overloaded
func (as *adaptiveShedder) systemOverloaded() bool {
    return systemOverloadChecker(as.cpuThreshold)
}

// cpu checker function
systemOverloadChecker = func(cpuThreshold int64) bool {
        return stat.CpuUsage() >= cpuThreshold
}

// cpu sliding average
curUsage := internal.RefreshCpu()
prevUsage := atomic.LoadInt64(&cpuUsage)
// cpu = cpuᵗ-¹ * beta + cpuᵗ * (1 - beta)
// sliding average algorithm
usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1 - beta))
atomic.StoreInt64(&cpuUsage, usage)
Enter fullscreen mode Exit fullscreen mode

Check if the system is in a cooling period stillHot:

Determine if the current system is in a cooling period, if it is, it should continue to try to check if the request is dropped. The main purpose is to prevent the system in the process of recovery from overload load has not yet come down, immediately increase the pressure again resulting in back and forth jitter, at this time should try to continue to discard the request.

func (as *adaptiveShedder) stillHot() bool {
    // No requests have been dropped recently
    // means the service is fine
    if !as.droppedRecently.True() {
        return false
    }
    // Not on cooldown
    dropTime := as.dropTime.Load()
    if dropTime == 0 {
        return false
    }
    // cool down time defaults to 1s
    hot := timex.Since(dropTime) < coolOffDuration
    // not in the cool off period, normal processing of requests in progress
    if !hot {
        // reset the drop record
        as.droppedRecently.Set(false)
    }

    return hot
}
Enter fullscreen mode Exit fullscreen mode

Check the number of concurrency currently being processed highThru().

Once current concurrency being processed > concurrency carrying limit then go into a down load state.

Why do we need to add locks here? Because adaptive downgrading is used globally to ensure that the concurrency average is correct.

Why do we need to add spin locks here? Because the concurrent processing can be done without blocking other goroutine execution tasks, using lock-free concurrency to improve performance.

func (as *adaptiveShedder) highThru() bool {
    // locking
    as.avgFlyingLock.Lock()
    // Get the sliding average
    // update at the end of each request
    avgFlying := as.avgFlying
    // Unlock
    as.avgFlyingLock.Unlock()
    // Maximum number of concurrent requests at this point in the system
    maxFlight := as.maxFlight()
    // whether the number of concurrent processes and the average number of concurrent processes is greater than the system's maximum number of concurrent processes
    return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight
}
Enter fullscreen mode Exit fullscreen mode

How do I get the number of concurrency being processed versus the average concurrency?

The current concurrency count is actually very simple: +1 concurrency for each allowed request, -1 for the promise object callback after the request is completed, and the average concurrency can be solved using the sliding average algorithm.

type promise struct {
    // request start time
    // count the time spent on request processing
    start time.
    shedder *adaptiveShedder
shedder *adaptiveShedder }

func (p *promise) Fail() {
    // End of request, -1 for the number of requests currently being processed
    p.shedder.addFlying(-1)
}

func (p *promise) Pass() {
    // response time in milliseconds
    rt := float64(timex.Since(p.start)) / float64(time.Millisecond)
    // end of request, number of requests currently being processed -1
    p.shedder.addFlying(-1)
    p.shedder.rtCounter.Add(math.Ceil(rt))
    p.shedder.passCounter.Add(1)
}

func (as *adaptiveShedder) addFlying(delta int64) {
    flying := atomic.AddInt64(&as.flying, delta)
    // After the request ends, count the concurrency of requests currently being processed
    if delta < 0 {
        as.avgFlyingLock.Lock()
        // Estimate the average number of requests for the current service over the recent time period
        as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta)
        as.avgFlyingLock.Unlock()
    }
}
Enter fullscreen mode Exit fullscreen mode

It's not enough to get the current system count, we also need to know the maximum number of concurrent requests the system can handle, i.e. the maximum number of concurrent requests.

The number of requests passed and the response time are achieved by a sliding window, which can be implemented in the Adaptive Fuse article.

The maximum concurrency of the current system = the maximum number of passes per unit time of the window * the minimum response time per unit time of the window.

// Calculate the maximum concurrency of the system per second
// max concurrency = max requests (qps) * min response time (rt)
func (as *adaptiveShedder) maxFlight() int64 {
    // windows = buckets per second
    // maxQPS = maxPASS * windows
    // minRT = min average response time in milliseconds
    // maxQPS * minRT / milliseconds_per_second
    // as.maxPass() * as.windows - maximum qps per bucket * number of buckets contained in 1s
    // as.minRt()/1e3 - the smallest average response time of all buckets in the window / 1000ms here to convert to seconds
    return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))
}    

// Sliding time window with multiple buckets
// find the one with the highest number of requests
// Each bucket takes up internal ms
// qps refers to the number of requests in 1s, qps: maxPass * time.Second/internal
func (as *adaptiveShedder) maxPass() int64 {
    var result float64 = 1
    // The bucket with the highest number of requests in the current time window
    as.passCounter.Reduce(func(b *collection.Bucket) {
        if b.Sum > result {
            result = b.Sum
        }
    })

    return int64(result)
}

// Sliding time window with multiple buckets
// Calculate the minimum average response time
// because it is necessary to calculate the maximum number of concurrency the system can handle in a recent period of time
func (as *adaptiveShedder) minRt() float64 {
    // default is 1000ms
    result := defaultMinRt

    as.rtCounter.Reduce(func(b *collection.Bucket) {
        if b.Count <= 0 {
            return
        }
        // request average response time
        avg := math.Round(b.Sum / float64(b.Count))
        if avg < result {
            result = avg
        }
    })

    return result
}
Enter fullscreen mode Exit fullscreen mode

Reference

Google BBR Congestion Control Algorithm

Principle of sliding average algorithm

go-zero adaptive load shedding

Project address

https://github.com/zeromicro/go-zero

Feel free to use go-zero and star to support us!

Top comments (0)