DEV Community πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’»

Bhavesh Praveen
Bhavesh Praveen

Posted on • Originally published at bhaveshpraveen.Medium

Implementing circuit breaker pattern from scratch in Python

We’ll briefly look into circuit breaker pattern before jumping to code.

What is circuit breaking?

In real world applications, services might go down and start back up all the time (or they might just stay down). The idea is that when you make a remote call(HTTP Request/RPC) to another service, there are chances that the remote call might fail. After a certain number of failed remote calls, we stop making the remote calls and send a cached response or an error as a response. After a certain delay, we allow one remote call to be made to the failing server, if it succeeds, we allow the subsequent remote calls to the server, if it did not succeed, we will continue sending a cached response or an error and will not make any remote calls to the failing service for some time.

When all services were working and the remote calls were returning without any errors, we call this state - "Closed".

When the remote calls continued to fail and when we stopped making any more remote calls to the failing service, we call this state - "Open"

After a certain delay, when we make a remote call to the failing service, the state transitions from "Open" to "Half Open". If the remote call does not fail, then we transition the state from "Half Open" to "Closed" and the subsequent remote calls are allowed to be made. In case the remote call failed, we transition the state from "Half Open", back to "Open" state and we wait for a certain period of time till we can make the next remote call (in Half Open state)

State Transition diagram
State Transition Diagram: image src

To know more, read this and this


Why do you need it?

  • To prevent a network or service failure from cascading to other services.
  • Saves bandwidth by not making requests over a network when the service you’re requesting is down.
  • Gives time for the failing service to recover or start backup.

Code Marathon

Let's now try to build a simple circuit-breaker using Python

Disclaimer: This is in no way production ready. There are some excellent libraries that are available online and well tested. I've mentioned two of them here: circuit-breaker and pybreaker.

Let's first decide on the api for the circuit breaker that we are going to build and also define the expected behavior.

I'm a big fan of retry library syntax. Let's try to use that here. We can it to this api towards the end of the blog post.

def circuit_breaker(exceptions=(Exception,), threshold=5, delay=60):
      """Returns a circuit decorator.

    :param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
    :param threshold: the number of failed attempts before changing the state to Open
    :param delay: delay in seconds between Closed and Half Open state
    :param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
                   default: retry.logging_logger. if None, logging is disabled.
    :returns: a circuit_breaker decorator.
    """
Enter fullscreen mode Exit fullscreen mode
@circuit_breaker(exceptions=Exception, threshold=5, delay=60)
def make_api_call(url, data):
  # function that makes an api-request to another server/application
  pass
Enter fullscreen mode Exit fullscreen mode

Let's define all the possible states

# circuit_breaker.py
class StateChoices:
    OPEN = "open"
    CLOSED = "closed"
    HALF_OPEN = "half_open"
Enter fullscreen mode Exit fullscreen mode

Let's create a class that handles all of the circuit breaker logic.

# circuit_breaker.py
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)

class CircuitBreaker:
    def __init__(self, func, exceptions, threshold, delay):
        """
        :param func: method that makes the remote call
        :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
        :param threshold: number of failed attempts before the state is changed to "Open"
        :param delay: delay in seconds between "Closed" and "Half-Open" state
        """
        self.func = func
        self.exceptions_to_catch = exceptions
        self.threshold = threshold
        self.delay = delay

        # by default set the state to closed
        self.state = StateChoices.CLOSED


        self.last_attempt_timestamp = None
        # keep track of failed attemp count
        self._failed_attempt_count = 0

    def update_last_attempt_timestamp(self):
        self.last_attempt_timestamp = datetime.utcnow().timestamp()

    def set_state(self, state):
        """To track the state changes by logging the information"""
        prev_state = self.state
        self.state = state
        logging.info(f"Changed state from {prev_state} to {self.state}")

    def handle_closed_state(self, *args, **kwargs):
        pass

    def handle_open_state(self, *args, **kwargs):
        pass

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)
Enter fullscreen mode Exit fullscreen mode

Constructor takes the following parameters

  • func - method/function that makes the remote call
  • exceptions - an exception or a tuple of exceptions to catch (ideally should be network exceptions)
  • threshold - number of failed attempts before the state is changed to "Open"
  • delay - delay in seconds between "Closed" and "Half-Open" state

make_remote_call takes the parameters that the underlying remote call needs (func)

If it seems confusing, please take a look at the following snippet

def make_request(url):
  print(f"Url is {url}")

obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)
obj.make_remote_call(url="www.google.com")
Enter fullscreen mode Exit fullscreen mode

make_request is passed as a first class function to CircuitBreaker class. The params required by make_request are sent through make_remote_call

Let's now try to complete handle_closed_state and handle_open_state

# circuit_breaker.py
class RemoteCallFailedException(Exception):
    pass

class CircuitBreaker:

    def handle_closed_state(self, *args, **kwargs):
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            logging.info("Success: Remote call")
            self.update_last_attempt_timestamp()
            return ret_val
        except allowed_exceptions as e:
            # remote call has failed
            logging.info("Failure: Remote call")
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # if the failed attempt count is more than the threshold
            # then change the state to OPEN
            if self._failed_attempt_count >= self.threshold:
                self.set_state(StateChoices.OPEN)
            # re-raise the exception
            raise RemoteCallFailedException from e


    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)
Enter fullscreen mode Exit fullscreen mode

handle_closed_state makes the remote call, if it is a success, then we update last_attempt_timestamp and return the result of the remote call. If the remote call fails, then _failed_attempt_count is incremented. If _failed_attempt_count has not reached the threshold, then simple raise an exception. If _failed_attempt_count is greater than or equal to the threshold, we change the state to Open and finally an exception is raised.

# circuit_breaker.py
class CircuitBreaker:

    def handle_open_state(self, *args, **kwargs):
        current_timestamp = datetime.utcnow().timestamp()
        # if `delay` seconds have not elapsed since the last attempt, raise an exception
        if self.last_attempt_timestamp + self.delay >= current_timestamp:
            raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")

        # after `delay` seconds have elapsed since the last attempt, try making the remote call
        # update the state to half open state
        self.set_state(StateChoices.HALF_OPEN)
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            # the remote call was successful
            # now reset the state to Closed
            self.set_state(StateChoices.CLOSED)
            # reset the failed attempt counter
            self._failed_attempt_count = 0
            # update the last_attempt_timestamp
            self.update_last_attempt_timestamp()
            # return the remote call's response
            return ret_val
        except allowed_exceptions as e:
            # the remote call failed again
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # set the state to "OPEN"
            self.set_state(StateChoices.OPEN)

            # raise the error
            raise RemoteCallFailedException from e

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)
Enter fullscreen mode Exit fullscreen mode

handle_open_state first checks if the delay seconds has elapsed since the last attempt to make a remote call. If not, then it raises an exception. If delay seconds has elapsed since the last attempt then we change the state ot "Half Open". Now we try to make one remote call to the failing service. If the remote call was successful, then we change the state to "Closed" and reset the _failed_attempt_count to 0 and return the response of the remote call. If the remote call failed, when it was in "Half Open" state, then state is again set to "Open" and we raise an exception.

Complete code

# circuit_breaker.py

import functools
import http
import logging
from datetime import datetime

import requests

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
)


class StateChoices:
    OPEN = "open"
    CLOSED = "closed"
    HALF_OPEN = "half_open"


class RemoteCallFailedException(Exception):
    pass


class CircuitBreaker:
    def __init__(self, func, exceptions, threshold, delay):
        """
        :param func: method that makes the remote call
        :param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
        :param threshold: number of failed attempts before the state is changed to "Open"
        :param delay: delay in seconds between "Closed" and "Half-Open" state
        """
        self.func = func
        self.exceptions_to_catch = exceptions
        self.threshold = threshold
        self.delay = delay

        # by default set the state to closed
        self.state = StateChoices.CLOSED


        self.last_attempt_timestamp = None
        # keep track of failed attemp count
        self._failed_attempt_count = 0

    def update_last_attempt_timestamp(self):
        self.last_attempt_timestamp = datetime.utcnow().timestamp()

    def set_state(self, state):
        prev_state = self.state
        self.state = state
        logging.info(f"Changed state from {prev_state} to {self.state}")

    def handle_closed_state(self, *args, **kwargs):
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            logging.info("Success: Remote call")
            self.update_last_attempt_timestamp()
            return ret_val
        except allowed_exceptions as e:
            # remote call has failed
            logging.info("Failure: Remote call")
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # if the failed attempt count is more than the threshold
            # then change the state to OPEN
            if self._failed_attempt_count >= self.threshold:
                self.set_state(StateChoices.OPEN)
            # re-raise the exception
            raise RemoteCallFailedException from e

    def handle_open_state(self, *args, **kwargs):
        current_timestamp = datetime.utcnow().timestamp()
        # if `delay` seconds have not elapsed since the last attempt, raise an exception
        if self.last_attempt_timestamp + self.delay >= current_timestamp:
            raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")

        # after `delay` seconds have elapsed since the last attempt, try making the remote call
        # update the state to half open state
        self.set_state(StateChoices.HALF_OPEN)
        allowed_exceptions = self.exceptions_to_catch
        try:
            ret_val = self.func(*args, **kwargs)
            # the remote call was successful
            # now reset the state to Closed
            self.set_state(StateChoices.CLOSED)
            # reset the failed attempt counter
            self._failed_attempt_count = 0
            # update the last_attempt_timestamp
            self.update_last_attempt_timestamp()
            # return the remote call's response
            return ret_val
        except allowed_exceptions as e:
            # the remote call failed again
            # increment the failed attempt count
            self._failed_attempt_count += 1

            # update last_attempt_timestamp
            self.update_last_attempt_timestamp()

            # set the state to "OPEN"
            self.set_state(StateChoices.OPEN)

            # raise the error
            raise RemoteCallFailedException from e

    def make_remote_call(self, *args, **kwargs):
        if self.state == StateChoices.CLOSED:
            return self.handle_closed_state(*args, **kwargs)
        if self.state == StateChoices.OPEN:
            return self.handle_open_state(*args, **kwargs)
Enter fullscreen mode Exit fullscreen mode

Now to test it out. Let's create a mock server.

Install Flask and requests. Ipython is optional

pip install requests
pip install Flask
pip install ipython 
Enter fullscreen mode Exit fullscreen mode

Let's create some endpoints to mock the server

# main.py

import random
import time

from flask import Flask
app = Flask(__name__)


@app.route('/success')
def success_endpoint():
    return {
        "msg": "Call to this endpoint was a smashing success."
    }, 200


@app.route('/failure')
def faulty_endpoint():
    r = random.randint(0, 1)
    if r == 0:
        time.sleep(2)

    return {
        "msg": "I will fail."
    }, 500


@app.route('/random')
def fail_randomly_endpoint():
    r = random.randint(0, 1)
    if r == 0:
        return {
            "msg": "Success msg"
        }, 200

    return {
        "msg": "I will fail (sometimes)."
    }, 500
Enter fullscreen mode Exit fullscreen mode

Run the development server

export FLASK_APP=main.py; flask run
Enter fullscreen mode Exit fullscreen mode

By default it runs on port 5000

Now to test it out. You can use these snippets to test it out.

# snippets.py

faulty_endpoint = "http://localhost:5000/failure"
success_endpoint = "http://localhost:5000/success"
random_status_endpoint = "http://localhost:5000/random"

def make_request(url):
    try:
        response = requests.get(url, timeout=0.3)
        if response.status_code == http.HTTPStatus.OK:
            print(f"Call to {url} succeed with status code = {response.status_code}")
            return response
        if 500 <= response.status_code < 600:
            print(f"Call to {url} failed with status code = {response.status_code}")
            raise Exception("Server Issue")
    except Exception:
        print(f"Call to {url} failed")
        raise
Enter fullscreen mode Exit fullscreen mode
(circuit-breaker) ➜  circuit-breaker git:(master) βœ— ipython

In [1]: from circuit_breaker import CircuitBreaker

In [2]: from snippets import make_request, faulty_endpoint, success_endpoint

In [3]: obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)

In [4]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:51,255 INFO: Success: Remote call
Out[4]: <Response [200]>

In [5]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:53,610 INFO: Success: Remote call
Out[5]: <Response [200]>

In [6]: vars(obj)
Out[6]:
{'func': <function snippets.make_request(url)>,
 'exceptions_to_catch': (Exception,),
 'threshold': 5,
 'delay': 10,
 'state': 'closed',
 'last_attempt_timestamp': 1607800073.610199,
 '_failed_attempt_count': 0}
Enter fullscreen mode Exit fullscreen mode

Line 1 and Line 2 are just imports. In line 3, we are creating a CircuitBreaker object for make_request. Here, we're setting exceptions=(Exception,), this will catch all the exceptions. We should ideally narrow down the exception to the one that we actually want to catch, in this case, Network Exceptions, but we're going to leave it there for this demo.

Now make successive calls to the faulty endpoint.

In [7]: obj.make_remote_call(faulty_endpoint)

In [8]: obj.make_remote_call(faulty_endpoint)

In [9]: obj.make_remote_call(faulty_endpoint)

In [10]: obj.make_remote_call(faulty_endpoint)

In [11]: obj.make_remote_call(faulty_endpoint)

In [12]: obj.make_remote_call(faulty_endpoint)
---------------------------------------------------------------------------
Traceback data ..........

RemoteCallFailedException: Retry after 8.688776969909668 secs  

In [13]: obj.make_remote_call(success_endpoint)
---------------------------------------------------------------------------
Traceback data......

RemoteCallFailedException: Retry after 6.096494913101196 secs

Enter fullscreen mode Exit fullscreen mode

Try to make these calls as fast as possible. After the first five callls to the faulty_endpoint, the next call(Line 12) will not make an api-request to the flask server instead it will raise an Exception, mentioning to retry after a specified number of secs. Even if you make an api call to the success_endpoint endpoint (Line 13), it will still raise an error. It is in "Open" state.

Now, after the delay time has elapsed, if we make a call to the faulty endpoint, it will transition from Half-Open to Open state.

In [18]: obj.make_remote_call(faulty_endpoint)
06:21:24,959 INFO: Changed state from open to half_open
...
06:21:24,964 INFO: Changed state from half_open to open
Enter fullscreen mode Exit fullscreen mode

Now, after the delay has elapsed, if we make a call to the success_endpoint, it will transition from Half-Open to Closed state

In [19]: obj.make_remote_call(success_endpoint)
06:25:10,673 INFO: Changed state from open to half_open
...
06:25:10,678 INFO: Changed state from half_open to closed
Out[19]: <Response [200]>
Enter fullscreen mode Exit fullscreen mode

Now we have a working circuit breaker. We could introduce response caching, monitoring and make it threadsafe. Errors could be handled better. More Exception types could help. All of these features are left as an exercise for the readers.

Finally, improving the api shouldn't take a lot of time. I've added quick dirty version here

# circuit_breaker.py

class APICircuitBreaker:
    def __init__(self, exceptions=(Exception,), threshold=5, delay=60):
        self.obj = functools.partial(
            CircuitBreaker,
            exceptions=exceptions,
            threshold=threshold,
            delay=delay
        )

    def __call__(self, func):
        self.obj = self.obj(func=func)

        def decorator(*args, **kwargs):
            ret_val = self.obj.make_remote_call(*args, **kwargs)
            return ret_val

        return decorator

    def __getattr__(self, item):
        return getattr(self.obj, item)


circuit_breaker = APICircuitBreaker
Enter fullscreen mode Exit fullscreen mode
# snippets.py

@circuit_breaker()
def make_request(url):
    try:
        response = requests.get(url, timeout=0.3)
        if response.status_code == http.HTTPStatus.OK:
            print(f"Call to {url} succeed with status code = {response.status_code}")
            return response
        if 500 <= response.status_code < 600:
            print(f"Call to {url} failed with status code = {response.status_code}")
            raise Exception("Server Issue")
    except Exception:
        print(f"Call to {url} failed")
        raise
Enter fullscreen mode Exit fullscreen mode

All code samples can be found here

Now we have a working circuit breaker. We could introduce response caching, monitoring and make it thread-safe. Errors could be handled better. More Exception types could help. All of these features are left as an exercise for the readers.

Connect with me on Twitter

References:

  1. https://dzone.com/articles/circuit-breaker-pattern
  2. https://medium.com/@narengowda/what-is-circuitbreaking-in-microservices-2053f4f66882
  3. https://martinfowler.com/bliki/CircuitBreaker.html
  4. https://microservices.io/patterns/reliability/circuit-breaker.html

Top comments (0)

πŸ€” Did you know?

Β 
✍️ Writing your own article is easy (we even support markdown).