DEV Community

Homayoon Alimohammadi
Homayoon Alimohammadi

Posted on • Originally published at Medium

Circuit breakers (in Go)

Circuit breakers are a fundamental and basic, yet vital part of a system, especially if you are dealing with a micro-service architecture. Recently one of my colleagues was wondering what these ‘proxy like’ pieces of code do, and while explaining to him, I figured although I kind of know what a circuit breaker is at a high level, I don’t really know what is going on under the hood. So I decided to dig deeper and share what I’ve found. In this article we will both learn about circuit breakers at a high level, as well walking through a basic implementation of the circuit breaker pattern in Go.

A simple depiction of the circuit breaker pattern

Introduction

Imagine “Service A” is trying to contact “Service B” (as the server) via an RPC. From the stand point of Service A, it kinda looks like this:

    package service_A

    func callServiceB(req Request) {
      resp, err := serviceB.SomeMethod(context.Background(), req)
      if err != nil {
        log.Printf("got error: %s", err.Error())
        return
      }

      return log.Printf("got response: %s", resp)
    }
Enter fullscreen mode Exit fullscreen mode

Well, this is probably the most basic way to make a call to another service, but there are a number things that can (an will) go wrong. Let’s say we did not receive any response from Service B, what are the possible reasons for that? One of them (which is not necessarily our concern right now) is that since we’re mostly dealing with unreliable network connections, it’s quite possible that the request is never processed by Service B. In this case we won’t solely rely on TCP retransmission, in fact most RPC frameworks provide their own retry mechanisms and it will be mostly handled in the application layer. But the fact that there was no response from the server does not always mean that there was a network issue, maybe there is something wrong with Service B.

In general, services (even the most reliable ones) tend to get slow or unavailable from time to time. This might result in an error in response to some (or all) of the incoming calls. If a service is really busy processing lots of requests, we might want to limit our request rate to that service, or even stop sending any requests for a while, letting it to take a breath and do whatever it was doing in peace.

In another case, let’s say the service was unavailable for a while and our previously failed requests (let’s say 100 requests) are being retried one after another. If we don’t limit the number of our future requests to that service, and make an additional 100 requests, when the service eventually recovers it will be faced with 200 requests at the same time which may even cause the service to immediately crash once again.

Although in the real world most RPCs are made with a specific timeout and won’t be retried afterwards, it is still a good practice not to flood a service with requests while it has problems responding.

Another positive point of this limiting mechanism for us is that we don’t bother making any request to that service (for a while), hence no time consuming I/O. This is definitively faster than just waiting for a while and then receive the failed result (like a 5xx status code).

Circuit Breakers

In simple terms, an RPC between two services is like drawing 2 straight lines between the two. One line sends the request from service A to service B, and the other returns the response from service B to service A. But in order to implement that “limiting” policy, we need to have some kind of a middle-man which decides whether to direct a request to the destination or not.

This middle-man, proxy (not a network proxy) or wrapper is either going to let the circuit (or connection) between two services “Closed”, or stop one from calling the other, hence “Opening” the circuit.

The main idea behind a circuit breaker is as follow:

By default the circuit is in the Closed mode and you are allowed to make calls freely to the destination. after a certain amount of failed responses from the destination (a threshold, let’s say 5), it is going to stop you from making any further requests for a while (a backoff time, like 30 seconds) in which the circuit is considered Open. **After that interval is over, It goes into a **Half-Open state. If the next request is going to determine whether we’re going to end up in the closed staet or get back in the open state. If successful, the circuit is going to be closed, but if the request fails we will be back in the open state and forced to wait for another back-off interval.

State transfer mechanism of a circuit breaker

Let’s see how a simple circuit breaker is implemented in Go.

Implementation

From the stand point of the service which is using a circuit breaker in order to make requests to another service, making the RPC is a bit different:

All the code you see below can be found in this github repo as well.

    package service_A

    func callServiceB(cb CircuitBreaker, req Request) {
      r, err := cb.Execute(func() (interface{}, error) {
        return serviceB.SomeMethod(req)
      })
      if err != nil {
        log.Printf("got error: %s", err.Error())
        return
      } else {
        resp, ok := r.(serviceB.Response)
        if !ok {
          log.Println("type assertion failed")
          return
        }

        log.Printf("got response: %s", resp)
      }
    }
Enter fullscreen mode Exit fullscreen mode

Let’s take a look at what the cb.Execute does:

    package circuitbreaker

    func (cb *circuitbreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
     err := cb.doPreRequest()
     if err != nil {
      return nil, err
     }

     res, err := req()

     err = cb.doPostRequest(err)
     if err != nil {
      return nil, err
     }

     return res, nil
    }
Enter fullscreen mode Exit fullscreen mode

The actual request is going to happen on line 8, res, err := req(), before which there should be a beforeRequest functionality:

    func (cb *circuitbreaker) doPreRequest() error {
     if cb.state == open {
      return ErrRefuse
     }

     return nil
    }
Enter fullscreen mode Exit fullscreen mode

This checks to see whether the current state is open or not. If the circuit is open, simple return an error indicating that the connection is refused. Let’s look at the structure of our circuit breaker so far:

    type State string

    const (
     open     State = "open"
     closed   State = "closed"
     halfOpen State = "half-open"
    )

    type circuitbreaker struct {
     state State
    }
Enter fullscreen mode Exit fullscreen mode

Fine, what about the afterRequest ?

    func (cb *circuitbreaker) doPostRequest(err error) error {
     cb.mutex.Lock()
     defer cb.mutex.Unlock()

     if err == nil {
      if cb.policy == MaxConsecutiveFails {
       cb.fails = 0
      }
      cb.state = closed
      return nil
     }

     if cb.state == halfOpen {
      cb.state = open
      cb.openChannel <- struct{}{}
      return err
     }

     cb.fails++
     if cb.failsExcceededThreshod() {
      cb.state = open
      cb.openChannel <- struct{}{}
     }

     return err
    }
Enter fullscreen mode Exit fullscreen mode

This functions introduces a bunch of new fields and methods, let’s describe the logic behind this function and then expand our circuit breaker structure.

  • mutex is used in order to make sure Read-Write-Modify cycles are done in a safe way, in case we have concurrent attempts to modify the circuit breaker state

  • err is the actual error that *destination *returned. If nil, basically reset the state and go on. fails are the number of the failed requests in the current closed state. MaxConsecutiveFails is a policy which means that the circuit breaker must open the circuit after experiencing n number of consecutive fails.

  • If there was an error and we were in a half-open state, open the circuit and go on. The cb.openChannel <- struct{}{} triggers the waiting interval

  • If there was an error and we were not in the half-open state, simply increment the number of failed attemps in the current state.

  • Check to see if the number of failed attempts excceeded the threshold regarding your policy. If so, open the circuit and trigger the waiting interval

Let’s take a look at the complete structure of our circuit breaker:

    type Policy int
    type State string

    const (
     // MaxFails specifies the maximum non-consecutive fails which are allowed
     // in the "Closed" state before the state is changd to "Open".
     MaxFails Policy = iota

     // MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
     // in the "Closed" state before the state is changed to "Open".
     MaxConsecutiveFails
    )

    const (
     open     State = "open"
     closed   State = "closed"
     halfOpen State = "half-open"
    )

    type circuitbreaker struct {
     policy              Policy
     maxFails            uint64
     maxConsecutiveFails uint64
     openInterval        time.Duration

     // fails is the number of failed requets for the current "Closed" state,
     // resets after a successful transition from half-open to closed.
     fails uint64

     // current state of the circuit
     state State

     // openChannel handles the event transfer mechanism for the open state
     openChannel chan struct{}

     mutex sync.Mutex
    }

And the helper functions weve seen so far:

    func (cb *circuitbreaker) failsExcceededThreshod() bool {
     switch cb.policy {
     case MaxConsecutiveFails:
      return cb.fails >= cb.maxConsecutiveFails
     case MaxFails:
      return cb.fails >= cb.maxFails
     default:
      return false
     }
    }

    func (cb *circuitbreaker) openWatcher() {
     for range cb.openChannel {
      time.Sleep(cb.openInterval)
      cb.mutex.Lock()
      cb.state = halfOpen
      cb.fails = 0
      cb.mutex.Unlock()
     }
    }
Enter fullscreen mode Exit fullscreen mode

The openWatcher simply listens on the openChannel and upon receive, sleeps the goroutine for the openInterval and then resets the number offails, then changes the state to half-open. The cycle then repeats. But when is the openWatcher called? Just when we are initializing our circuit breaker:

    type ExtraOptions struct {
     // Policy determines how the fails should be incremented
     Policy Policy

     // MaxFails specifies the maximum non-consecutive fails which are allowed
     // in the "Closed" state before the state is changd to "Open".
     MaxFails *uint64

     // MaxConsecutiveFails specifies the maximum consecutive fails which are allowed
     // in the "Closed" state before the state is changed to "Open".
     MaxConsecutiveFails *uint64

     OpenInterval *time.Duration
    }

    func New(opts ...ExtraOptions) Circuitbreaker {
     var opt ExtraOptions
     if len(opts) > 0 {
      opt = opts[0]
     }

     if opt.MaxFails == nil {
      opt.MaxFails = literal.ToPointer(uint64(5))
     }

     if opt.MaxConsecutiveFails == nil {
      opt.MaxConsecutiveFails = literal.ToPointer(uint64(5))
     }

     if opt.OpenInterval == nil {
      opt.OpenInterval = literal.ToPointer(5 * time.Second)
     }

     cb := &circuitbreaker{
      policy:              opt.Policy,
      maxFails:            *opt.MaxFails,
      maxConsecutiveFails: *opt.MaxConsecutiveFails,
      openInterval:        *opt.OpenInterval,
      openChannel:         make(chan struct{}),
     }

     go cb.openWatcher()

     return cb
    }
Enter fullscreen mode Exit fullscreen mode

Let’s try what we’ve written with the most simplistic way possible:

    package main

    func main() {
     cbOpts := circuitbreaker.ExtraOptions{
      Policy:              circuitbreaker.MaxFails,
      MaxFails:            literal.ToPointer(uint64(5)),
      MaxConsecutiveFails: literal.ToPointer(uint64(5)),
      OpenInterval:        literal.ToPointer(160 * time.Millisecond),
     }
     cb := circuitbreaker.New(cbOpts)
     wg := &sync.WaitGroup{}
     for i := 1; i < 100; i += 1 {
      wg.Add(1)
      go makeServiceCall(i, cb, wg)
      time.Sleep(10 * time.Millisecond)
     }

     log.Println("sent all the requests")
     wg.Wait()
     log.Println("got all the responses, exiting.")
    }

    func serviceMethod(id int) (string, error) {
     if val := rand.Float64(); val <= 0.3 {
      return "", errors.New("failed")
     }
     return fmt.Sprintf("[id: %d] done.", id), nil
    }

    func makeServiceCall(id int, cb circuitbreaker.Circuitbreaker, wg *sync.WaitGroup) {
     defer wg.Done()

     resp, err := cb.Execute(func() (interface{}, error) {
      return serviceMethod(id)
     })
     if err != nil {
      log.Printf("[id %d] got err: %s", id, err.Error())
     } else {
      log.Printf("[id %d] success: %s", id, resp)
     }
    }
Enter fullscreen mode Exit fullscreen mode

Feel free to experiment with different values for policy, MaxFails, MaxConsecutiveFails and openInterval.

Conclusion

So far we’ve seen how circuit breakers play a vital role in reliability and performance. Without circuit breakers the client might waste lots of time trying to connect to an unavailable service and waiting for the whole timeout period each time. The target service on the other hand, might be bombarded with dozens of requests right after recovering from a down time or disaster, leading to another unfortunate outage. We’ve also walked through a simple implementation of the circuit breaker pattern in Go. Needless to say that I’d love to know your opinion about this subject as well as any other comments or suggestions that you might have. I learn a lot reading through your comments as well as corrections or additional information that your provide in your comments.

Top comments (0)