We’ll briefly look into circuit breaker pattern before jumping to code.
What is circuit breaking?
In real world applications, services might go down and start back up all the time (or they might just stay down). The idea is that when you make a remote call(HTTP Request/RPC) to another service, there are chances that the remote call might fail. After a certain number of failed remote calls, we stop making the remote calls and send a cached response or an error as a response. After a certain delay, we allow one remote call to be made to the failing server, if it succeeds, we allow the subsequent remote calls to the server, if it did not succeed, we will continue sending a cached response or an error and will not make any remote calls to the failing service for some time.
When all services were working and the remote calls were returning without any errors, we call this state - "Closed".
When the remote calls continued to fail and when we stopped making any more remote calls to the failing service, we call this state - "Open"
After a certain delay, when we make a remote call to the failing service, the state transitions from "Open" to "Half Open". If the remote call does not fail, then we transition the state from "Half Open" to "Closed" and the subsequent remote calls are allowed to be made. In case the remote call failed, we transition the state from "Half Open", back to "Open" state and we wait for a certain period of time till we can make the next remote call (in Half Open state)
State Transition Diagram: image src
To know more, read this and this
Why do you need it?
- To prevent a network or service failure from cascading to other services.
- Saves bandwidth by not making requests over a network when the service you’re requesting is down.
- Gives time for the failing service to recover or start backup.
Code Marathon
Let's now try to build a simple circuit-breaker using Python
Disclaimer: This is in no way production ready. There are some excellent libraries that are available online and well tested. I've mentioned two of them here: circuit-breaker and pybreaker.
Let's first decide on the api for the circuit breaker that we are going to build and also define the expected behavior.
I'm a big fan of retry library syntax. Let's try to use that here. We can it to this api towards the end of the blog post.
def circuit_breaker(exceptions=(Exception,), threshold=5, delay=60):
"""Returns a circuit decorator.
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
:param threshold: the number of failed attempts before changing the state to Open
:param delay: delay in seconds between Closed and Half Open state
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
default: retry.logging_logger. if None, logging is disabled.
:returns: a circuit_breaker decorator.
"""
@circuit_breaker(exceptions=Exception, threshold=5, delay=60)
def make_api_call(url, data):
# function that makes an api-request to another server/application
pass
Let's define all the possible states
# circuit_breaker.py
class StateChoices:
OPEN = "open"
CLOSED = "closed"
HALF_OPEN = "half_open"
Let's create a class that handles all of the circuit breaker logic.
# circuit_breaker.py
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
class CircuitBreaker:
def __init__(self, func, exceptions, threshold, delay):
"""
:param func: method that makes the remote call
:param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
:param threshold: number of failed attempts before the state is changed to "Open"
:param delay: delay in seconds between "Closed" and "Half-Open" state
"""
self.func = func
self.exceptions_to_catch = exceptions
self.threshold = threshold
self.delay = delay
# by default set the state to closed
self.state = StateChoices.CLOSED
self.last_attempt_timestamp = None
# keep track of failed attemp count
self._failed_attempt_count = 0
def update_last_attempt_timestamp(self):
self.last_attempt_timestamp = datetime.utcnow().timestamp()
def set_state(self, state):
"""To track the state changes by logging the information"""
prev_state = self.state
self.state = state
logging.info(f"Changed state from {prev_state} to {self.state}")
def handle_closed_state(self, *args, **kwargs):
pass
def handle_open_state(self, *args, **kwargs):
pass
def make_remote_call(self, *args, **kwargs):
if self.state == StateChoices.CLOSED:
return self.handle_closed_state(*args, **kwargs)
if self.state == StateChoices.OPEN:
return self.handle_open_state(*args, **kwargs)
Constructor takes the following parameters
-
func
- method/function that makes the remote call -
exceptions
- an exception or a tuple of exceptions to catch (ideally should be network exceptions) -
threshold
- number of failed attempts before the state is changed to "Open" -
delay
- delay in seconds between "Closed" and "Half-Open" state
make_remote_call
takes the parameters that the underlying remote call needs (func
)
If it seems confusing, please take a look at the following snippet
def make_request(url):
print(f"Url is {url}")
obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)
obj.make_remote_call(url="www.google.com")
make_request
is passed as a first class function to CircuitBreaker class. The params required by make_request
are sent through make_remote_call
Let's now try to complete handle_closed_state
and handle_open_state
# circuit_breaker.py
class RemoteCallFailedException(Exception):
pass
class CircuitBreaker:
def handle_closed_state(self, *args, **kwargs):
allowed_exceptions = self.exceptions_to_catch
try:
ret_val = self.func(*args, **kwargs)
logging.info("Success: Remote call")
self.update_last_attempt_timestamp()
return ret_val
except allowed_exceptions as e:
# remote call has failed
logging.info("Failure: Remote call")
# increment the failed attempt count
self._failed_attempt_count += 1
# update last_attempt_timestamp
self.update_last_attempt_timestamp()
# if the failed attempt count is more than the threshold
# then change the state to OPEN
if self._failed_attempt_count >= self.threshold:
self.set_state(StateChoices.OPEN)
# re-raise the exception
raise RemoteCallFailedException from e
def make_remote_call(self, *args, **kwargs):
if self.state == StateChoices.CLOSED:
return self.handle_closed_state(*args, **kwargs)
if self.state == StateChoices.OPEN:
return self.handle_open_state(*args, **kwargs)
handle_closed_state
makes the remote call, if it is a success, then we update last_attempt_timestamp
and return the result of the remote call. If the remote call fails, then _failed_attempt_count
is incremented. If _failed_attempt_count
has not reached the threshold, then simple raise an exception. If _failed_attempt_count
is greater than or equal to the threshold, we change the state to Open and finally an exception is raised.
# circuit_breaker.py
class CircuitBreaker:
def handle_open_state(self, *args, **kwargs):
current_timestamp = datetime.utcnow().timestamp()
# if `delay` seconds have not elapsed since the last attempt, raise an exception
if self.last_attempt_timestamp + self.delay >= current_timestamp:
raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")
# after `delay` seconds have elapsed since the last attempt, try making the remote call
# update the state to half open state
self.set_state(StateChoices.HALF_OPEN)
allowed_exceptions = self.exceptions_to_catch
try:
ret_val = self.func(*args, **kwargs)
# the remote call was successful
# now reset the state to Closed
self.set_state(StateChoices.CLOSED)
# reset the failed attempt counter
self._failed_attempt_count = 0
# update the last_attempt_timestamp
self.update_last_attempt_timestamp()
# return the remote call's response
return ret_val
except allowed_exceptions as e:
# the remote call failed again
# increment the failed attempt count
self._failed_attempt_count += 1
# update last_attempt_timestamp
self.update_last_attempt_timestamp()
# set the state to "OPEN"
self.set_state(StateChoices.OPEN)
# raise the error
raise RemoteCallFailedException from e
def make_remote_call(self, *args, **kwargs):
if self.state == StateChoices.CLOSED:
return self.handle_closed_state(*args, **kwargs)
if self.state == StateChoices.OPEN:
return self.handle_open_state(*args, **kwargs)
handle_open_state
first checks if the delay
seconds has elapsed since the last attempt to make a remote call. If not, then it raises an exception. If delay
seconds has elapsed since the last attempt then we change the state ot "Half Open". Now we try to make one remote call to the failing service. If the remote call was successful, then we change the state to "Closed" and reset the _failed_attempt_count
to 0 and return the response of the remote call. If the remote call failed, when it was in "Half Open" state, then state is again set to "Open" and we raise an exception.
Complete code
# circuit_breaker.py
import functools
import http
import logging
from datetime import datetime
import requests
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
class StateChoices:
OPEN = "open"
CLOSED = "closed"
HALF_OPEN = "half_open"
class RemoteCallFailedException(Exception):
pass
class CircuitBreaker:
def __init__(self, func, exceptions, threshold, delay):
"""
:param func: method that makes the remote call
:param exceptions: an exception or a tuple of exceptions to catch (ideally should be network exceptions)
:param threshold: number of failed attempts before the state is changed to "Open"
:param delay: delay in seconds between "Closed" and "Half-Open" state
"""
self.func = func
self.exceptions_to_catch = exceptions
self.threshold = threshold
self.delay = delay
# by default set the state to closed
self.state = StateChoices.CLOSED
self.last_attempt_timestamp = None
# keep track of failed attemp count
self._failed_attempt_count = 0
def update_last_attempt_timestamp(self):
self.last_attempt_timestamp = datetime.utcnow().timestamp()
def set_state(self, state):
prev_state = self.state
self.state = state
logging.info(f"Changed state from {prev_state} to {self.state}")
def handle_closed_state(self, *args, **kwargs):
allowed_exceptions = self.exceptions_to_catch
try:
ret_val = self.func(*args, **kwargs)
logging.info("Success: Remote call")
self.update_last_attempt_timestamp()
return ret_val
except allowed_exceptions as e:
# remote call has failed
logging.info("Failure: Remote call")
# increment the failed attempt count
self._failed_attempt_count += 1
# update last_attempt_timestamp
self.update_last_attempt_timestamp()
# if the failed attempt count is more than the threshold
# then change the state to OPEN
if self._failed_attempt_count >= self.threshold:
self.set_state(StateChoices.OPEN)
# re-raise the exception
raise RemoteCallFailedException from e
def handle_open_state(self, *args, **kwargs):
current_timestamp = datetime.utcnow().timestamp()
# if `delay` seconds have not elapsed since the last attempt, raise an exception
if self.last_attempt_timestamp + self.delay >= current_timestamp:
raise RemoteCallFailedException(f"Retry after {self.last_attempt_timestamp+self.delay-current_timestamp} secs")
# after `delay` seconds have elapsed since the last attempt, try making the remote call
# update the state to half open state
self.set_state(StateChoices.HALF_OPEN)
allowed_exceptions = self.exceptions_to_catch
try:
ret_val = self.func(*args, **kwargs)
# the remote call was successful
# now reset the state to Closed
self.set_state(StateChoices.CLOSED)
# reset the failed attempt counter
self._failed_attempt_count = 0
# update the last_attempt_timestamp
self.update_last_attempt_timestamp()
# return the remote call's response
return ret_val
except allowed_exceptions as e:
# the remote call failed again
# increment the failed attempt count
self._failed_attempt_count += 1
# update last_attempt_timestamp
self.update_last_attempt_timestamp()
# set the state to "OPEN"
self.set_state(StateChoices.OPEN)
# raise the error
raise RemoteCallFailedException from e
def make_remote_call(self, *args, **kwargs):
if self.state == StateChoices.CLOSED:
return self.handle_closed_state(*args, **kwargs)
if self.state == StateChoices.OPEN:
return self.handle_open_state(*args, **kwargs)
Now to test it out. Let's create a mock server.
Install Flask and requests. Ipython is optional
pip install requests
pip install Flask
pip install ipython
Let's create some endpoints to mock the server
# main.py
import random
import time
from flask import Flask
app = Flask(__name__)
@app.route('/success')
def success_endpoint():
return {
"msg": "Call to this endpoint was a smashing success."
}, 200
@app.route('/failure')
def faulty_endpoint():
r = random.randint(0, 1)
if r == 0:
time.sleep(2)
return {
"msg": "I will fail."
}, 500
@app.route('/random')
def fail_randomly_endpoint():
r = random.randint(0, 1)
if r == 0:
return {
"msg": "Success msg"
}, 200
return {
"msg": "I will fail (sometimes)."
}, 500
Run the development server
export FLASK_APP=main.py; flask run
By default it runs on port 5000
Now to test it out. You can use these snippets to test it out.
# snippets.py
faulty_endpoint = "http://localhost:5000/failure"
success_endpoint = "http://localhost:5000/success"
random_status_endpoint = "http://localhost:5000/random"
def make_request(url):
try:
response = requests.get(url, timeout=0.3)
if response.status_code == http.HTTPStatus.OK:
print(f"Call to {url} succeed with status code = {response.status_code}")
return response
if 500 <= response.status_code < 600:
print(f"Call to {url} failed with status code = {response.status_code}")
raise Exception("Server Issue")
except Exception:
print(f"Call to {url} failed")
raise
(circuit-breaker) ➜ circuit-breaker git:(master) ✗ ipython
In [1]: from circuit_breaker import CircuitBreaker
In [2]: from snippets import make_request, faulty_endpoint, success_endpoint
In [3]: obj = CircuitBreaker(make_request, exceptions=(Exception,), threshold=5, delay=10)
In [4]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:51,255 INFO: Success: Remote call
Out[4]: <Response [200]>
In [5]: obj.make_remote_call(success_endpoint)
Call to http://localhost:5000/success succeed with status code = 200
06:07:53,610 INFO: Success: Remote call
Out[5]: <Response [200]>
In [6]: vars(obj)
Out[6]:
{'func': <function snippets.make_request(url)>,
'exceptions_to_catch': (Exception,),
'threshold': 5,
'delay': 10,
'state': 'closed',
'last_attempt_timestamp': 1607800073.610199,
'_failed_attempt_count': 0}
Line 1 and Line 2 are just imports. In line 3, we are creating a CircuitBreaker object for make_request
. Here, we're setting exceptions=(Exception,)
, this will catch all the exceptions. We should ideally narrow down the exception to the one that we actually want to catch, in this case, Network Exceptions, but we're going to leave it there for this demo.
Now make successive calls to the faulty
endpoint.
In [7]: obj.make_remote_call(faulty_endpoint)
In [8]: obj.make_remote_call(faulty_endpoint)
In [9]: obj.make_remote_call(faulty_endpoint)
In [10]: obj.make_remote_call(faulty_endpoint)
In [11]: obj.make_remote_call(faulty_endpoint)
In [12]: obj.make_remote_call(faulty_endpoint)
---------------------------------------------------------------------------
Traceback data ..........
RemoteCallFailedException: Retry after 8.688776969909668 secs
In [13]: obj.make_remote_call(success_endpoint)
---------------------------------------------------------------------------
Traceback data......
RemoteCallFailedException: Retry after 6.096494913101196 secs
Try to make these calls as fast as possible. After the first five callls to the faulty_endpoint, the next call(Line 12) will not make an api-request to the flask server instead it will raise an Exception, mentioning to retry after a specified number of secs. Even if you make an api call to the success_endpoint
endpoint (Line 13), it will still raise an error. It is in "Open" state.
Now, after the delay time has elapsed, if we make a call to the faulty endpoint, it will transition from Half-Open to Open state.
In [18]: obj.make_remote_call(faulty_endpoint)
06:21:24,959 INFO: Changed state from open to half_open
...
06:21:24,964 INFO: Changed state from half_open to open
Now, after the delay has elapsed, if we make a call to the success_endpoint, it will transition from Half-Open to Closed state
In [19]: obj.make_remote_call(success_endpoint)
06:25:10,673 INFO: Changed state from open to half_open
...
06:25:10,678 INFO: Changed state from half_open to closed
Out[19]: <Response [200]>
Now we have a working circuit breaker. We could introduce response caching, monitoring and make it threadsafe. Errors could be handled better. More Exception types could help. All of these features are left as an exercise for the readers.
Finally, improving the api shouldn't take a lot of time. I've added quick dirty version here
# circuit_breaker.py
class APICircuitBreaker:
def __init__(self, exceptions=(Exception,), threshold=5, delay=60):
self.obj = functools.partial(
CircuitBreaker,
exceptions=exceptions,
threshold=threshold,
delay=delay
)
def __call__(self, func):
self.obj = self.obj(func=func)
def decorator(*args, **kwargs):
ret_val = self.obj.make_remote_call(*args, **kwargs)
return ret_val
return decorator
def __getattr__(self, item):
return getattr(self.obj, item)
circuit_breaker = APICircuitBreaker
# snippets.py
@circuit_breaker()
def make_request(url):
try:
response = requests.get(url, timeout=0.3)
if response.status_code == http.HTTPStatus.OK:
print(f"Call to {url} succeed with status code = {response.status_code}")
return response
if 500 <= response.status_code < 600:
print(f"Call to {url} failed with status code = {response.status_code}")
raise Exception("Server Issue")
except Exception:
print(f"Call to {url} failed")
raise
All code samples can be found here
Now we have a working circuit breaker. We could introduce response caching, monitoring and make it thread-safe. Errors could be handled better. More Exception types could help. All of these features are left as an exercise for the readers.
Connect with me on Twitter
References:
Top comments (0)