GraphBit's Production Resilience Engineering: Concrete Code Examples
Based on GraphBit's actual codebase, here are the specific implementations that demonstrate how the "unglamorous but critical work" translates into production-grade resilience engineering:
1. Error Classification Implementation
GraphBit implements sophisticated error classification in core/src/types.rs and core/src/errors.rs:
/// Types of errors that can potentially be retried
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum RetryableErrorType {
/// Network connectivity issues
NetworkError,
/// Request timeout errors
TimeoutError,
/// Rate limiting from external services
RateLimitError,
/// Temporary service unavailability
TemporaryUnavailable,
/// Internal server errors (5xx)
InternalServerError,
/// Authentication/authorization that might be temporary
AuthenticationError,
/// Resource conflicts that might resolve
ResourceConflict,
/// All other errors (use with caution)
Other,
}
impl RetryableErrorType {
/// Determine retry type from error
pub fn from_error(error: &crate::errors::GraphBitError) -> Self {
`// Simple error type classification based on error message`
`let error_str = error.to_string().to_lowercase();`
/// Check if the error is retryable
pub fn is_retryable(&self) -> bool {
matches!(
`self,`
`GraphBitError::Network { .. }`
`| GraphBitError::RateLimit { .. }`
`| GraphBitError::LlmProvider { .. }`
`| GraphBitError::Llm { .. }`
)
}
/// Get retry delay in seconds for retryable errors
pub fn retry_delay(&self) -> Option<u64> {
match self {
`GraphBitError::RateLimit {`
`retry_after_seconds,`
`..`
`} => Some(*retry_after_seconds),`
`GraphBitError::Network { .. } => Some(1),`
`GraphBitError::LlmProvider { .. } => Some(2),`
`GraphBitError::Llm { .. } => Some(1),`
`_ => None,`
}
}
2. Adaptive Retry Policies with Exponential Backoff + Jitter
The actual implementation in core/src/types.rs shows GraphBit's sophisticated retry logic:
/// Calculate delay for a given attempt with exponential backoff and jitter
pub fn calculate_delay(&self, attempt: u32) -> u64 {
if attempt == 0 {
`return 0;`
}
let base_delay = (self.initial_delay_ms as f64
`* self.backoff_multiplier.powi(attempt as i32 - 1))`
.min(self.max_delay_ms as f64);
// Add jitter to prevent thundering herd
let jitter = if self.jitter_factor > 0.0 {
`let max_jitter = base_delay * self.jitter_factor;`
`use rand::Rng;`
`let mut rng = rand::thread_rng();`
`rng.gen_range(-max_jitter..=max_jitter)`
} else {
`0.0`
};
((base_delay + jitter).max(0.0) as u64).min(self.max_delay_ms)
}
Production Configuration Example:
impl Default for RetryConfig {
fn default() -> Self {
`Self {`
`max_attempts: 3,`
`initial_delay_ms: 1000,`
`backoff_multiplier: 2.0,`
`max_delay_ms: DEFAULT_TIMEOUT_MS,`
`jitter_factor: 0.1, // 10% jitter prevents thundering herd`
`retryable_errors: vec![`
`RetryableErrorType::NetworkError,`
`RetryableErrorType::TimeoutError,`
`RetryableErrorType::TemporaryUnavailable,`
`RetryableErrorType::InternalServerError,`
`],`
`}`
}
}
3. Circuit Breaker State Management: Closed → Open → Half-Open
GraphBit implements the full circuit breaker pattern in python/src/llm/client.rs:
/// Circuit breaker state for resilience
#[derive(Debug, Clone)]
enum CircuitBreakerState {
Closed { failure_count: u32 },
Open { opened_at: Instant },
HalfOpen,
}
impl CircuitBreaker {
async fn can_execute(&self) -> bool {
`if !self.config.circuit_breaker_enabled {`
`return true;`
`}`
`let mut state = self.state.write().await;`
`match *state {`
`CircuitBreakerState::Closed { .. } => true,`
`CircuitBreakerState::Open { opened_at } => {`
`if opened_at.elapsed() > self.config.circuit_breaker_recovery_timeout {`
`*state = CircuitBreakerState::HalfOpen;`
`true`
`} else {`
`false`
`}`
`}`
`CircuitBreakerState::HalfOpen => true,`
`}`
}
async fn record_failure(&self) {
let mut state = self.state.write().await;
match *state {
`CircuitBreakerState::Closed { failure_count } => {`
`let new_count = failure_count + 1;`
`if new_count >= self.config.circuit_breaker_threshold {`
`*state = CircuitBreakerState::Open {`
`opened_at: Instant::now(),`
`};`
`if self.config.debug {`
`warn!("Circuit breaker opened due to {} failures", new_count);`
`}`
`} else {`
`*state = CircuitBreakerState::Closed {`
`failure_count: new_count,`
`};`
`}`
`}`
`CircuitBreakerState::HalfOpen => {`
`*state = CircuitBreakerState::Open {`
`opened_at: Instant::now(),`
`};`
`if self.config.debug {`
`warn!("Circuit breaker reopened after failed recovery attempt");`
`}`
`}`
`_ => {}`
}
}
4. Production Resilience Features: Cost Efficiency & Compute Waste Prevention
Production Client Configuration (python/src/llm/client.rs):
impl Default for ClientConfig {
fn default() -> Self {
`Self {`
`request_timeout: Duration::from_secs(120), // Increased to match Ollama's capabilities`
`max_retries: 3,`
`base_retry_delay: Duration::from_millis(100),`
`max_retry_delay: Duration::from_secs(5),`
`circuit_breaker_enabled: true,`
`circuit_breaker_threshold: 5,`
`circuit_breaker_recovery_timeout: Duration::from_secs(60),`
`debug: false,`
`}`
}
}
Resilient Request Execution that prevents the 30-40% compute waste:
/// Execute a single request with retry logic and circuit breaker
async fn execute_request_with_resilience(
provider: Arc<RwLock<Box<dyn LlmProviderTrait>>>,
circuit_breaker: Arc<CircuitBreaker>,
stats: Arc<RwLock<ClientStats>>,
config: ClientConfig,
request: LlmRequest,
) -> Result<graphbit_core::llm::LlmResponse, PyErr> {
// Check circuit breaker
if !circuit_breaker.can_execute().await {
`return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(`
`"Circuit breaker is open, request rejected",`
`));`
}
let mut last_error = None;
let mut delay = config.base_retry_delay;
for attempt in 0..=config.max_retries {
`match timeout(config.request_timeout, provider_request).await {`
`Ok(Ok(response)) => {`
`circuit_breaker.record_success().await;`
`return Ok(response);`
`}`
`Ok(Err(e)) => {`
`circuit_breaker.record_failure().await;`
`last_error = Some(to_py_error(e));`
`}`
`Err(_) => {`
`// Timeout occurred`
`circuit_breaker.record_failure().await;`
`last_error = Some(timeout_error(`
`"llm_request",`
`config.request_timeout.as_millis() as u64,`
`"Request timed out",`
`));`
`}`
`}`
`// Exponential backoff with jitter`
`if attempt < config.max_retries {`
`tokio::time::sleep(delay).await;`
`delay = (delay * 2).min(config.max_retry_delay);`
`}`
}
5. Python API Usage Examples
Production Configuration (docs/api-reference/configuration.md):
def create_prod_config():
"""Configuration for production environment."""
# Initialize without debugging
init(debug=False, log_level="warn")
# High-quality model for production
config = LlmConfig.openai(
`api_key=os.getenv("OPENAI_API_KEY"),`
`model="gpt-4o-mini"`
)
# High-throughput executor for production
executor = Executor.new_high_throughput(
`config,`
`timeout_seconds=300,`
`debug=False`
)
# Configure for production reliability
executor.configure(
`timeout_seconds=300,`
`max_retries=3,`
`enable_metrics=True,`
`debug=False`
)
return executor
Circuit Breaker Usage (docs/user-guide/reliability.md):
def create_circuit_breaker_executor():
"""Create executor with circuit breaker protection."""
# Base executor
llm_config = LlmConfig.openai(
`api_key=os.getenv("OPENAI_API_KEY"),`
`model="gpt-4o-mini"`
)
base_executor = Executor(llm_config)
# Circuit breaker configuration
circuit_breaker = CircuitBreaker(
`failure_threshold=3, # Open after 3 failures`
`recovery_timeout=30, # Try again after 30 seconds`
`timeout_seconds=60 # Individual execution timeout`
)
reliable_executor = ReliableExecutor(base_executor, circuit_breaker)
return reliable_executor
6. Cost Efficiency: Model Selection & Pricing
GraphBit includes actual cost tracking in core/src/llm/openai.rs:
fn cost_per_token(&self) -> Option<(f64, f64)> {
// Cost per token in USD (input, output) as of late 2023
match self.model.as_str() {
`"gpt-4" => Some((0.00003, 0.00006)),`
`"gpt-4-32k" => Some((0.00006, 0.00012)),`
`"gpt-4-turbo" => Some((0.00001, 0.00003)),`
`"gpt-4o" => Some((0.000005, 0.000015)),`
`"gpt-4o-mini" => Some((0.00000015, 0.0000006)), // Most cost-effective`
`"gpt-3.5-turbo" => Some((0.0000015, 0.000002)),`
`"gpt-3.5-turbo-16k" => Some((0.000003, 0.000004)),`
`_ => None,`
}
}
7. Tunable Configuration Options
Developers can configure resilience behavior through multiple parameters:
/// Configure exponential backoff
pub fn with_exponential_backoff(
mut self,
initial_delay_ms: u64,
multiplier: f64,
max_delay_ms: u64,
) -> Self {
self.initial_delay_ms = initial_delay_ms;
self.backoff_multiplier = multiplier;
self.max_delay_ms = max_delay_ms;
self
}
/// Configure jitter factor
pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
self.jitter_factor = jitter_factor.clamp(0.0, 1.0);
self
}
/// Configure which errors should trigger retries
pub fn with_retryable_errors(mut self, errors: Vec<RetryableErrorType>) -> Self {
self.retryable_errors = errors;
self
}
Key Business Impact Features
- Prevents 30-40% Compute Waste: The circuit breaker immediately stops requests when providers are down, preventing wasted API calls and costs.
- Intelligent Error Classification: Different error types get different retry strategies - rate limits get longer delays, network errors get immediate retries.
- Jitter Prevents Thundering Herd: Random jitter in retry delays prevents all clients from retrying simultaneously.
- Cost-Aware Model Selection: Built-in cost tracking helps developers choose the most cost-effective models (like gpt-4o-mini at $0.00000015 per input token).
- Production-Ready Defaults: Sensible defaults (3 retries, 10% jitter, 60s circuit breaker recovery) work out of the box for production workloads.
This implementation demonstrates how GraphBit's "unglamorous but critical work" translates into measurable business value: reduced costs, improved reliability, and predictable performance for production AI systems handling thousands of API calls daily.
For Posting Purpose can use below High-Level code examples :
Production-Grade Resilience Engineering: Architectural Patterns & Implementation Examples
Here are realistic, production-quality code implementations that demonstrate enterprise-grade resilience engineering patterns:
1. Circuit Breaker Pattern - Production Implementation
from
enum
import
Enum
from
dataclasses
import
dataclass
from
typing
import
Optional
,
Callable
,
Any
,
Dict
import
time
import
asyncio
import
logging
from
contextlib
import
asynccontextmanager
class
CircuitState
(
Enum
)
:
CLOSED
=
"closed"
OPEN
=
"open"
HALF_OPEN
=
"half_open"
@dataclass
class
CircuitBreakerConfig
:
failure_threshold
:
int
=
5
recovery_timeout_seconds
:
int
=
60
success_threshold
:
int
=
3
timeout_seconds
:
int
=
30
class
CircuitBreakerMetrics
:
def
__init__
(
self
)
:
`self`
.
total_requests
=
0
`self`
.
failed_requests
=
0
`self`
.
successful_requests
=
0
`self`
.
circuit_opens
=
0
`self`
.
last_failure_time
:
Optional
[
float
]
=
None
@property
def
failure_rate
(
self
)
-
>
float
:
if
self
.
total_requests
==
0
:
return
0.0
return
self
.
failed_requests
/
self
.
total_requests
class
ProductionCircuitBreaker
:
"""
Production-grade circuit breaker with comprehensive monitoring and logging.
Prevents cascading failures in distributed systems.
"""
def
__init__
(
self
,
name
:
str
,
config
:
CircuitBreakerConfig
)
:
`self`
.
name
=
name
`self`
.
config
=
config
`self`
.
state
=
CircuitState
.
CLOSED
`self`
.
failure_count
=
0
`self`
.
success_count
=
0
`self`
.
last_failure_time
:
Optional
[
float
]
=
None
`self`
.
metrics
=
CircuitBreakerMetrics
(
)
`self`
.
logger
=
logging
.
getLogger
(
f"circuit_breaker.
{
name
}
"
)
def
can_execute
(
self
)
-
>
bool
:
"""Check if request should be allowed through the circuit breaker."""
`current_time`
=
time
.
time
(
)
if
self
.
state
==
CircuitState
.
CLOSED
:
return
True
elif
self
.
state
==
CircuitState
.
OPEN
:
if
(
self
.
last_failure_time
and
`current_time`
-
self
.
last_failure_time
>=
self
.
config
.
recovery_timeout_seconds
)
:
`self`
.
_transition_to_half_open
(
)
return
True
return
False
elif
self
.
state
==
CircuitState
.
HALF_OPEN
:
return
True
return
False
def
record_success
(
self
)
:
"""Record successful operation and update circuit state."""
`self`
.
metrics
.
successful_requests
+=
1
`self`
.
metrics
.
total_requests
+=
1
if
self
.
state
==
CircuitState
.
HALF_OPEN
:
`self`
.
success_count
+=
1
if
self
.
success_count
>=
self
.
config
.
success_threshold
:
`self`
.
_transition_to_closed
(
)
elif
self
.
state
==
CircuitState
.
CLOSED
:
`self`
.
failure_count
=
0
def
record_failure
(
self
)
:
"""Record failed operation and update circuit state."""
`self`
.
metrics
.
failed_requests
+=
1
`self`
.
metrics
.
total_requests
+=
1
`self`
.
last_failure_time
=
time
.
time
(
)
`self`
.
metrics
.
last_failure_time
=
self
.
last_failure_time
if
self
.
state
==
CircuitState
.
CLOSED
:
`self`
.
failure_count
+=
1
if
self
.
failure_count
>=
self
.
config
.
failure_threshold
:
`self`
.
_transition_to_open
(
)
elif
self
.
state
==
CircuitState
.
HALF_OPEN
:
`self`
.
_transition_to_open
(
)
def
_transition_to_open
(
self
)
:
"""Transition circuit breaker to OPEN state."""
`self`
.
state
=
CircuitState
.
OPEN
`self`
.
metrics
.
circuit_opens
+=
1
`self`
.
logger
.
warning
(
f"Circuit breaker '
{
self
.
name
}
' opened after
{
self
.
failure_count
}
failures. "
f"Failure rate:
{
self
.
metrics
.
failure_rate
:
.2%
}
"
)
def
_transition_to_half_open
(
self
)
:
"""Transition circuit breaker to HALF_OPEN state."""
`self`
.
state
=
CircuitState
.
HALF_OPEN
`self`
.
success_count
=
0
`self`
.
logger
.
info
(
f"Circuit breaker '
{
self
.
name
}
' transitioned to HALF_OPEN"
)
def
_transition_to_closed
(
self
)
:
"""Transition circuit breaker to CLOSED state."""
`self`
.
state
=
CircuitState
.
CLOSED
`self`
.
failure_count
=
0
`self`
.
success_count
=
0
`self`
.
logger
.
info
(
f"Circuit breaker '
{
self
.
name
}
' closed after recovery"
)
def
get_metrics
(
self
)
-
>
Dict
[
str
,
Any
]
:
"""Get comprehensive circuit breaker metrics."""
return
{
"name"
:
self
.
name
,
"state"
:
self
.
state
.
value
,
"total_requests"
:
self
.
metrics
.
total_requests
,
"successful_requests"
:
self
.
metrics
.
successful_requests
,
"failed_requests"
:
self
.
metrics
.
failed_requests
,
"failure_rate"
:
self
.
metrics
.
failure_rate
,
"circuit_opens"
:
self
.
metrics
.
circuit_opens
,
"current_failure_count"
:
self
.
failure_count
,
"last_failure_time"
:
self
.
metrics
.
last_failure_time
}
@asynccontextmanager
async
def
circuit_breaker_context
(
circuit_breaker
:
ProductionCircuitBreaker
)
:
"""Context manager for circuit breaker execution."""
if
not
circuit_breaker
.
can_execute
(
)
:
raise
CircuitBreakerOpenError
(
f"Circuit breaker '
{
circuit_breaker
.
name
}
' is open"
)
try
:
yield
`circuit_breaker`
.
record_success
(
)
except
Exception
as
e
:
`circuit_breaker`
.
record_failure
(
)
raise
class
CircuitBreakerOpenError
(
Exception
)
:
"""Raised when circuit breaker is open and requests are rejected."""
pass
2. Adaptive Retry Logic with Exponential Backoff + Jitter
import
random
import
asyncio
from
typing
import
List
,
Type
,
Union
,
Callable
,
Any
from
dataclasses
import
dataclass
import
logging
@dataclass
class
RetryConfig
:
max_attempts
:
int
=
3
base_delay_seconds
:
float
=
1.0
max_delay_seconds
:
float
=
60.0
exponential_base
:
float
=
2.0
jitter_factor
:
float
=
0.1
retryable_exceptions
:
List
[
Type
[
Exception
]
]
=
None
def
__post_init__
(
self
)
:
if
self
.
retryable_exceptions
is
None
:
`self`
.
retryable_exceptions
=
[
`ConnectionError`
,
`TimeoutError`
,
`IOError`
,
# Add domain-specific exceptions
]
class
RetryMetrics
:
def
__init__
(
self
)
:
`self`
.
total_attempts
=
0
`self`
.
successful_operations
=
0
`self`
.
failed_operations
=
0
`self`
.
retry_attempts
=
0
`self`
.
average_attempts_per_operation
=
0.0
def
record_attempt
(
self
)
:
`self`
.
total_attempts
+=
1
def
record_success
(
self
,
attempts_used
:
int
)
:
`self`
.
successful_operations
+=
1
`self`
.
retry_attempts
+=
(
attempts_used
-
1
)
`self`
.
_update_average
(
)
def
record_failure
(
self
,
attempts_used
:
int
)
:
`self`
.
failed_operations
+=
1
`self`
.
retry_attempts
+=
(
attempts_used
-
1
)
`self`
.
_update_average
(
)
def
_update_average
(
self
)
:
`total_ops`
=
self
.
successful_operations
+
self
.
failed_operations
if
total_ops
>
0
:
`self`
.
average_attempts_per_operation
=
self
.
total_attempts
/
total_ops
class
ProductionRetryHandler
:
"""
Production-grade retry handler with intelligent backoff strategies.
Prevents thundering herd problems and provides comprehensive monitoring.
"""
def
__init__
(
self
,
name
:
str
,
config
:
RetryConfig
)
:
`self`
.
name
=
name
`self`
.
config
=
config
`self`
.
metrics
=
RetryMetrics
(
)
`self`
.
logger
=
logging
.
getLogger
(
f"retry_handler.
{
name
}
"
)
def
calculate_delay
(
self
,
attempt
:
int
)
-
>
float
:
"""Calculate delay with exponential backoff and jitter."""
if
attempt
<=
1
:
return
0
# Exponential backoff
`delay`
=
min
(
`self`
.
config
.
base_delay_seconds
*
(
self
.
config
.
exponential_base
**
(
attempt
-
2
)
)
,
`self`
.
config
.
max_delay_seconds
)
# Add jitter to prevent thundering herd
if
self
.
config
.
jitter_factor
>
0
:
`jitter_range`
=
delay
*
self
.
config
.
jitter_factor
`jitter`
=
random
.
uniform
(
-
jitter_range
,
jitter_range
)
`delay`
=
max
(
0
,
delay
+
jitter
)
return
delay
def
is_retryable
(
self
,
exception
:
Exception
)
-
>
bool
:
"""Determine if an exception should trigger a retry."""
return
any
(
isinstance
(
exception
,
exc_type
)
for
exc_type
in
self
.
config
.
retryable_exceptions
)
async
def
execute_with_retry
(
self
,
operation
:
Callable
,
*
args
,
**
kwargs
)
-
>
Any
:
"""Execute operation with retry logic."""
`last_exception`
=
None
for
attempt
in
range
(
1
,
self
.
config
.
max_attempts
+
1
)
:
`self`
.
metrics
.
record_attempt
(
)
try
:
`self`
.
logger
.
debug
(
f"Executing
{
self
.
name
}
(attempt
{
attempt
}
)"
)
`result`
=
await
self
.
_execute_operation
(
operation
,
*
args
,
**
kwargs
)
`self`
.
metrics
.
record_success
(
attempt
)
if
attempt
>
1
:
`self`
.
logger
.
info
(
f"Operation
{
self
.
name
}
succeeded on attempt
{
attempt
}
"
)
return
result
except
Exception
as
e
:
`last_exception`
=
e
if
not
self
.
is_retryable
(
e
)
:
`self`
.
logger
.
error
(
f"Non-retryable error in
{
self
.
name
}
:
{
e
}
"
)
`self`
.
metrics
.
record_failure
(
attempt
)
raise
if
attempt
==
self
.
config
.
max_attempts
:
`self`
.
logger
.
error
(
f"Operation
{
self
.
name
}
failed after
{
attempt
}
attempts:
{
e
}
"
)
`self`
.
metrics
.
record_failure
(
attempt
)
raise
`delay`
=
self
.
calculate_delay
(
attempt
)
`self`
.
logger
.
warning
(
f"Attempt
{
attempt
}
failed for
{
self
.
name
}
:
{
e
}
. "
f"Retrying in
{
delay
:
.2f
}
s"
)
if
delay
>
0
:
await
asyncio
.
sleep
(
delay
)
# This should never be reached, but included for completeness
raise
last_exception
async
def
_execute_operation
(
self
,
operation
:
Callable
,
*
args
,
**
kwargs
)
-
>
Any
:
"""Execute the operation, handling both sync and async callables."""
if
asyncio
.
iscoroutinefunction
(
operation
)
:
return
await
operation
(
*
args
,
**
kwargs
)
else
:
return
operation
(
*
args
,
**
kwargs
)
def
get_metrics
(
self
)
-
>
Dict
[
str
,
Any
]
:
"""Get comprehensive retry metrics."""
return
{
"name"
:
self
.
name
,
"total_attempts"
:
self
.
metrics
.
total_attempts
,
"successful_operations"
:
self
.
metrics
.
successful_operations
,
"failed_operations"
:
self
.
metrics
.
failed_operations
,
"retry_attempts"
:
self
.
metrics
.
retry_attempts
,
"average_attempts_per_operation"
:
self
.
metrics
.
average_attempts_per_operation
,
"success_rate"
:
(
`self`
.
metrics
.
successful_operations
/
max
(
1
,
self
.
metrics
.
successful_operations
+
self
.
metrics
.
failed_operations
)
)
}
# Decorator for easy retry application
def
with_retry
(
config
:
RetryConfig
,
name
:
str
=
None
)
:
"""Decorator to add retry logic to functions."""
def
decorator
(
func
)
:
`handler_name`
=
name
or
f"
{
func
.
__module__
}
.
{
func
.
__name__
}
"
`retry_handler`
=
ProductionRetryHandler
(
handler_name
,
config
)
async
def
async_wrapper
(
*
args
,
**
kwargs
)
:
return
await
retry_handler
.
execute_with_retry
(
func
,
*
args
,
**
kwargs
)
def
sync_wrapper
(
*
args
,
**
kwargs
)
:
return
asyncio
.
run
(
`retry_handler`
.
execute_with_retry
(
func
,
*
args
,
**
kwargs
)
)
if
asyncio
.
iscoroutinefunction
(
func
)
:
return
async_wrapper
else
:
return
sync_wrapper
return
decorator
3. Intelligent Error Classification System
from
enum
import
Enum
from
typing
import
Dict
,
List
,
Optional
,
Type
,
Callable
from
dataclasses
import
dataclass
import
re
import
logging
class
ErrorSeverity
(
Enum
)
:
LOW
=
"low"
MEDIUM
=
"medium"
HIGH
=
"high"
CRITICAL
=
"critical"
class
ErrorCategory
(
Enum
)
:
NETWORK
=
"network"
AUTHENTICATION
=
"authentication"
AUTHORIZATION
=
"authorization"
RATE_LIMIT
=
"rate_limit"
TIMEOUT
=
"timeout"
VALIDATION
=
"validation"
RESOURCE_EXHAUSTION
=
"resource_exhaustion"
SYSTEM
=
"system"
UNKNOWN
=
"unknown"
@dataclass
class
ErrorClassification
:
category
:
ErrorCategory
severity
:
ErrorSeverity
is_retryable
:
bool
suggested_delay
:
float
max_retries
:
int
escalation_required
:
bool
=
False
class
ErrorPattern
:
def
__init__
(
self
,
`pattern`
:
str
,
`classification`
:
ErrorClassification
,
`exception_types`
:
List
[
Type
[
Exception
]
]
=
None
)
:
`self`
.
pattern
=
re
.
compile
(
pattern
,
re
.
IGNORECASE
)
`self`
.
classification
=
classification
`self`
.
exception_types
=
exception_types
or
[
]
class
ProductionErrorClassifier
:
"""
Production-grade error classification system for intelligent error handling.
Provides context-aware error categorization and response strategies.
"""
def
__init__
(
self
)
:
`self`
.
patterns
:
List
[
ErrorPattern
]
=
[
]
`self`
.
classification_cache
:
Dict
[
str
,
ErrorClassification
]
=
{
}
`self`
.
metrics
=
{
"classifications"
:
0
,
"cache_hits"
:
0
,
"category_counts"
:
{
category
.
value
:
0
for
category
in
ErrorCategory
}
}
`self`
.
logger
=
logging
.
getLogger
(
"error_classifier"
)
`self`
.
_initialize_default_patterns
(
)
def
_initialize_default_patterns
(
self
)
:
"""Initialize default error classification patterns."""
`patterns`
=
[
# Network errors
`ErrorPattern`
(
r"connection.*(?:refused|reset|timeout|failed)"
,
`ErrorClassification`
(
`ErrorCategory`
.
NETWORK
,
ErrorSeverity
.
MEDIUM
,
`is_retryable`
=
True
,
suggested_delay
=
2.0
,
max_retries
=
3
)
,
[
ConnectionError
,
OSError
]
)
,
# Authentication errors
`ErrorPattern`
(
r"(?:auth|unauthorized|invalid.*key|forbidden)"
,
`ErrorClassification`
(
`ErrorCategory`
.
AUTHENTICATION
,
ErrorSeverity
.
HIGH
,
`is_retryable`
=
False
,
suggested_delay
=
0
,
max_retries
=
0
,
`escalation_required`
=
True
)
,
[
PermissionError
]
)
,
# Rate limiting
`ErrorPattern`
(
r"(?:rate.*limit|quota.*exceeded|throttle|too.*many.*requests)"
,
`ErrorClassification`
(
`ErrorCategory`
.
RATE_LIMIT
,
ErrorSeverity
.
MEDIUM
,
`is_retryable`
=
True
,
suggested_delay
=
60.0
,
max_retries
=
5
)
)
,
# Timeout errors
`ErrorPattern`
(
r"(?:timeout|timed.*out|deadline.*exceeded)"
,
`ErrorClassification`
(
`ErrorCategory`
.
TIMEOUT
,
ErrorSeverity
.
MEDIUM
,
`is_retryable`
=
True
,
suggested_delay
=
5.0
,
max_retries
=
3
)
,
[
TimeoutError
]
)
,
# Resource exhaustion
`ErrorPattern`
(
r"(?:memory|resource|capacity|limit.*exceeded|out.*of.*space)"
,
`ErrorClassification`
(
`ErrorCategory`
.
RESOURCE_EXHAUSTION
,
ErrorSeverity
.
HIGH
,
`is_retryable`
=
True
,
suggested_delay
=
30.0
,
max_retries
=
2
,
`escalation_required`
=
True
)
,
[
MemoryError
]
)
,
# Validation errors
`ErrorPattern`
(
r"(?:invalid.*input|validation.*failed|bad.*request|malformed)"
,
`ErrorClassification`
(
`ErrorCategory`
.
VALIDATION
,
ErrorSeverity
.
LOW
,
`is_retryable`
=
False
,
suggested_delay
=
0
,
max_retries
=
0
)
,
[
ValueError
]
)
]
`self`
.
patterns
.
extend
(
patterns
)
def
classify_error
(
self
,
`exception`
:
Exception
,
`context`
:
Optional
[
Dict
[
str
,
Any
]
]
=
None
)
-
>
ErrorClassification
:
"""Classify an error and return appropriate handling strategy."""
`error_key`
=
f"
{
type
(
exception
)
.
__name__
}
:
{
str
(
exception
)
}
"
# Check cache first
if
error_key
in
self
.
classification_cache
:
`self`
.
metrics
[
"cache_hits"
]
+=
1
return
self
.
classification_cache
[
error_key
]
`self`
.
metrics
[
"classifications"
]
+=
1
# Try exception type matching first
for
pattern
in
self
.
patterns
:
if
any
(
isinstance
(
exception
,
exc_type
)
for
exc_type
in
pattern
.
exception_types
)
:
`classification`
=
pattern
.
classification
break
else
:
# Fall back to message pattern matching
`error_message`
=
str
(
exception
)
`classification`
=
None
for
pattern
in
self
.
patterns
:
if
pattern
.
pattern
.
search
(
error_message
)
:
`classification`
=
pattern
.
classification
break
if
classification
is
None
:
`classification`
=
ErrorClassification
(
`ErrorCategory`
.
UNKNOWN
,
ErrorSeverity
.
MEDIUM
,
`is_retryable`
=
True
,
suggested_delay
=
1.0
,
max_retries
=
1
)
# Apply context-based adjustments
if
context
:
`classification`
=
self
.
_apply_context_adjustments
(
classification
,
context
)
# Cache the result
`self`
.
classification_cache
[
error_key
]
=
classification
`self`
.
metrics
[
"category_counts"
]
[
classification
.
category
.
value
]
+=
1
`self`
.
logger
.
debug
(
f"Classified error as
{
classification
.
category
.
value
}
"
f"(severity:
{
classification
.
severity
.
value
}
, retryable:
{
classification
.
is_retryable
}
)"
)
return
classification
def
_apply_context_adjustments
(
self
,
`classification`
:
ErrorClassification
,
`context`
:
Dict
[
str
,
Any
]
)
-
>
ErrorClassification
:
"""Apply context-specific adjustments to error classification."""
# Example context-based adjustments
if
context
.
get
(
"is_critical_operation"
,
False
)
:
if
classification
.
severity
==
ErrorSeverity
.
MEDIUM
:
`classification`
.
severity
=
ErrorSeverity
.
HIGH
`classification`
.
escalation_required
=
True
if
context
.
get
(
"retry_count"
,
0
)
>
2
:
`classification`
.
suggested_delay
*=
2
# Increase delay for repeated failures
return
classification
def
add_custom_pattern
(
self
,
pattern
:
ErrorPattern
)
:
"""Add custom error pattern for domain-specific errors."""
`self`
.
patterns
.
insert
(
0
,
pattern
)
# Insert at beginning for priority
`self`
.
logger
.
info
(
f"Added custom error pattern:
{
pattern
.
pattern
.
pattern
}
"
)
def
get_metrics
(
self
)
-
>
Dict
[
str
,
Any
]
:
"""Get error classification metrics."""
return
{
"total_classifications"
:
self
.
metrics
[
"classifications"
]
,
"cache_hits"
:
self
.
metrics
[
"cache_hits"
]
,
"cache_hit_rate"
:
(
`self`
.
metrics
[
"cache_hits"
]
/
max
(
1
,
self
.
metrics
[
"classifications"
]
)
)
,
"category_distribution"
:
self
.
metrics
[
"category_counts"
]
,
"patterns_loaded"
:
len
(
self
.
patterns
)
}
class
ErrorHandler
:
"""High-level error handler that integrates classification with response strategies."""
def
__init__
(
self
,
classifier
:
ProductionErrorClassifier
)
:
`self`
.
classifier
=
classifier
`self`
.
logger
=
logging
.
getLogger
(
"error_handler"
)
async
def
handle_error
(
self
,
`exception`
:
Exception
,
`context`
:
Optional
[
Dict
[
str
,
Any
]
]
=
None
)
-
>
bool
:
"""
`Handle an error based on its classification.`
`Returns True if the operation should be retried, False otherwise.`
`"""`
`classification`
=
self
.
classifier
.
classify_error
(
exception
,
context
)
# Log the error with appropriate level
`log_message`
=
f"Handling
{
classification
.
category
.
value
}
error:
{
exception
}
"
if
classification
.
severity
==
ErrorSeverity
.
CRITICAL
:
`self`
.
logger
.
critical
(
log_message
)
elif
classification
.
severity
==
ErrorSeverity
.
HIGH
:
`self`
.
logger
.
error
(
log_message
)
elif
classification
.
severity
==
ErrorSeverity
.
MEDIUM
:
`self`
.
logger
.
warning
(
log_message
)
else
:
`self`
.
logger
.
info
(
log_message
)
# Trigger escalation if required
if
classification
.
escalation_required
:
await
self
.
_escalate_error
(
exception
,
classification
,
context
)
return
classification
.
is_retryable
async
def
_escalate_error
(
self
,
`exception`
:
Exception
,
`classification`
:
ErrorClassification
,
`context`
:
Optional
[
Dict
[
str
,
Any
]
]
)
:
"""Escalate critical errors to monitoring/alerting systems."""
`escalation_data`
=
{
"error"
:
str
(
exception
)
,
"category"
:
classification
.
category
.
value
,
"severity"
:
classification
.
severity
.
value
,
"context"
:
context
or
{
}
,
"timestamp"
:
time
.
time
(
)
}
# In production, this would integrate with your alerting system
`self`
.
logger
.
critical
(
f"ESCALATION REQUIRED:
{
escalation_data
}
"
)
4. Integrated Resilience Framework
import
asyncio
from
typing
import
Any
,
Dict
,
Optional
,
Callable
from
dataclasses
import
dataclass
import
logging
import
time
@dataclass
class
ResilienceConfig
:
circuit_breaker
:
CircuitBreakerConfig
retry
:
RetryConfig
timeout_seconds
:
float
=
30.0
enable_metrics
:
bool
=
True
enable_logging
:
bool
=
True
class
ProductionResilienceFramework
:
"""
Integrated resilience framework combining circuit breaker, retry logic,
error classification, and comprehensive monitoring.
"""
def
__init__
(
self
,
name
:
str
,
config
:
ResilienceConfig
)
:
`self`
.
name
=
name
`self`
.
config
=
config
# Initialize components
`self`
.
circuit_breaker
=
ProductionCircuitBreaker
(
f"
{
name
}
_circuit"
,
config
.
circuit_breaker
)
`self`
.
retry_handler
=
ProductionRetryHandler
(
f"
{
name
}
_retry"
,
config
.
retry
)
`self`
.
error_classifier
=
ProductionErrorClassifier
(
)
`self`
.
error_handler
=
ErrorHandler
(
self
.
error_classifier
)
# Metrics and logging
`self`
.
metrics
=
{
"total_operations"
:
0
,
"successful_operations"
:
0
,
"failed_operations"
:
0
,
"circuit_breaker_rejections"
:
0
,
"average_response_time_ms"
:
0.0
,
"uptime_start"
:
time
.
time
(
)
}
`self`
.
logger
=
logging
.
getLogger
(
f"resilience.
{
name
}
"
)
async
def
execute
(
self
,
`operation`
:
Callable
,
*
args
,
`context`
:
Optional
[
Dict
[
str
,
Any
]
]
=
None
,
**
kwargs
)
-
>
Any
:
"""
`Execute operation with full resilience protection.`
`"""`
`start_time`
=
time
.
time
(
)
`self`
.
metrics
[
"total_operations"
]
+=
1
try
:
# Circuit breaker check
async
with
circuit_breaker_context
(
self
.
circuit_breaker
)
:
# Execute with timeout and retry
`result`
=
await
asyncio
.
wait_for
(
`self`
.
retry_handler
.
execute_with_retry
(
`operation`
,
*
args
,
**
kwargs
)
,
`timeout`
=
self
.
config
.
timeout_seconds
)
# Record success metrics
`duration_ms`
=
(
time
.
time
(
)
-
start_time
)
*
1000
`self`
.
_update_success_metrics
(
duration_ms
)
return
result
except
CircuitBreakerOpenError
:
`self`
.
metrics
[
"circuit_breaker_rejections"
]
+=
1
`self`
.
logger
.
warning
(
f"Operation rejected by circuit breaker:
{
self
.
name
}
"
)
raise
except
Exception
as
e
:
# Classify and handle the error
`should_retry`
=
await
self
.
error_handler
.
handle_error
(
e
,
context
)
`self`
.
_update_failure_metrics
(
)
if
not
should_retry
:
`self`
.
logger
.
error
(
f"Non-retryable error in
{
self
.
name
}
:
{
e
}
"
)
raise
def
_update_success_metrics
(
self
,
duration_ms
:
float
)
:
"""Update success metrics with response time tracking."""
`self`
.
metrics
[
"successful_operations"
]
+=
1
# Update average response time (exponential moving average)
`alpha`
=
0.1
# Smoothing factor
`current_avg`
=
self
.
metrics
[
"average_response_time_ms"
]
`self`
.
metrics
[
"average_response_time_ms"
]
=
(
`alpha`
*
duration_ms
+
(
1
-
alpha
)
*
current_avg
)
def
_update_failure_metrics
(
self
)
:
"""Update failure metrics."""
`self`
.
metrics
[
"failed_operations"
]
+=
1
def
get_health_status
(
self
)
-
>
Dict
[
str
,
Any
]
:
"""Get comprehensive health status of the resilience framework."""
`uptime_seconds`
=
time
.
time
(
)
-
self
.
metrics
[
"uptime_start"
]
return
{
"name"
:
self
.
name
,
"status"
:
"healthy"
if
self
.
circuit_breaker
.
state
==
CircuitState
.
CLOSED
else
"degraded"
,
"uptime_seconds"
:
uptime_seconds
,
"metrics"
:
{
**
self
.
metrics
,
"success_rate"
:
self
.
_calculate_success_rate
(
)
,
"circuit_breaker"
:
self
.
circuit_breaker
.
get_metrics
(
)
,
"retry_handler"
:
self
.
retry_handler
.
get_metrics
(
)
,
"error_classifier"
:
self
.
error_classifier
.
get_metrics
(
)
}
}
def
_calculate_success_rate
(
self
)
-
>
float
:
"""Calculate overall success rate."""
`total_ops`
=
self
.
metrics
[
"successful_operations"
]
+
self
.
metrics
[
"failed_operations"
]
if
total_ops
==
0
:
return
1.0
return
self
.
metrics
[
"successful_operations"
]
/
total_ops
def
reset_metrics
(
self
)
:
"""Reset all metrics (useful for testing or periodic resets)."""
`self`
.
metrics
=
{
"total_operations"
:
0
,
"successful_operations"
:
0
,
"failed_operations"
:
0
,
"circuit_breaker_rejections"
:
0
,
"average_response_time_ms"
:
0.0
,
"uptime_start"
:
time
.
time
(
)
}
# Reset component metrics
`self`
.
circuit_breaker
.
metrics
=
CircuitBreakerMetrics
(
)
`self`
.
retry_handler
.
metrics
=
RetryMetrics
(
)
# Factory function for easy framework creation
def
create_resilience_framework
(
name
:
str
,
`failure_threshold`
:
int
=
5
,
`max_retries`
:
int
=
3
,
`timeout_seconds`
:
float
=
30.0
)
-
>
ProductionResilienceFramework
:
"""Create a resilience framework with sensible defaults."""
config
=
ResilienceConfig
(
`circuit_breaker`
=
CircuitBreakerConfig
(
failure_threshold
=
failure_threshold
)
,
`retry`
=
RetryConfig
(
max_attempts
=
max_retries
)
,
`timeout_seconds`
=
timeout_seconds
)
return
ProductionResilienceFramework
(
name
,
config
)
5. Usage Examples - Production Integration
import
asyncio
import
aiohttp
from
typing
import
Dict
,
Any
# Example: HTTP Client with Resilience
class
ResilientHttpClient
:
"""Production HTTP client with comprehensive resilience patterns."""
def
__init__
(
self
)
:
# Configure resilience for different operation types
`self`
.
api_resilience
=
create_resilience_framework
(
"api_calls"
,
`failure_threshold`
=
3
,
`max_retries`
=
3
,
`timeout_seconds`
=
30.0
)
`self`
.
batch_resilience
=
create_resilience_framework
(
"batch_operations"
,
`failure_threshold`
=
5
,
`max_retries`
=
2
,
`timeout_seconds`
=
120.0
)
# Add custom error patterns for HTTP-specific errors
`self`
.
api_resilience
.
error_classifier
.
add_custom_pattern
(
`ErrorPattern`
(
r"5\d{2}.*server.*error"
,
`ErrorClassification`
(
`ErrorCategory`
.
SYSTEM
,
ErrorSeverity
.
HIGH
,
`is_retryable`
=
True
,
suggested_delay
=
10.0
,
max_retries
=
3
)
)
)
async
def
make_api_call
(
self
,
url
:
str
,
**
kwargs
)
-
>
Dict
[
str
,
Any
]
:
"""Make API call with full resilience protection."""
async
def
_api_operation
(
)
:
async
with
aiohttp
.
ClientSession
(
)
as
session
:
async
with
session
.
get
(
url
,
**
kwargs
)
as
response
:
`response`
.
raise_for_status
(
)
return
await
response
.
json
(
)
`context`
=
{
"url"
:
url
,
"is_critical_operation"
:
kwargs
.
get
(
"critical"
,
False
)
}
return
await
self
.
api_resilience
.
execute
(
`_api_operation`
,
`context`
=
context
)
async
def
batch_process
(
self
,
items
:
list
,
processor_func
:
Callable
)
-
>
list
:
"""Process items in batch with resilience."""
async
def
_batch_operation
(
)
:
`results`
=
[
]
for
item
in
items
:
`result`
=
await
processor_func
(
item
)
`results`
.
append
(
result
)
return
results
return
await
self
.
batch_resilience
.
execute
(
_batch_operation
)
def
get_health_metrics
(
self
)
-
>
Dict
[
str
,
Any
]
:
"""Get comprehensive health metrics for monitoring."""
return
{
"api_operations"
:
self
.
api_resilience
.
get_health_status
(
)
,
"batch_operations"
:
self
.
batch_resilience
.
get_health_status
(
)
,
"overall_status"
:
self
.
_calculate_overall_health
(
)
}
def
_calculate_overall_health
(
self
)
-
>
str
:
"""Calculate overall system health status."""
`api_healthy`
=
self
.
api_resilience
.
circuit_breaker
.
state
==
CircuitState
.
CLOSED
`batch_healthy`
=
self
.
batch_resilience
.
circuit_breaker
.
state
==
CircuitState
.
CLOSED
if
api_healthy
and
batch_healthy
:
return
"healthy"
elif
api_healthy
or
batch_healthy
:
return
"degraded"
else
:
return
"unhealthy"
# Example usage
async
def
main
(
)
:
client
=
ResilientHttpClient
(
)
try
:
# Make resilient API calls
`result`
=
await
client
.
make_api_call
(
"https://api.example.com/data"
,
`critical`
=
True
)
print
(
f"API Result:
{
result
}
"
)
# Process batch with resilience
`items`
=
[
1
,
2
,
3
,
4
,
5
]
async
def
process_item
(
item
)
:
# Simulate processing that might fail
if
item
==
3
:
raise
ConnectionError
(
"Simulated network error"
)
return
item
*
2
`batch_results`
=
await
client
.
batch_process
(
items
,
process_item
)
print
(
f"Batch Results:
{
batch_results
}
"
)
except
Exception
as
e
:
print
(
f"Operation failed:
{
e
}
"
)
# Get health metrics for monitoring
health
=
client
.
get_health_metrics
(
)
print
(
f"System Health:
{
health
}
"
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
(
)
)
Key Architectural Benefits
- Separation of Concerns: Each component (circuit breaker, retry, error classification) is independently configurable and testable
- Comprehensive Monitoring: Built-in metrics collection for observability and alerting
- Context-Aware Decisions: Error handling adapts based on operation context and history
- Production-Ready Logging: Structured logging with appropriate severity levels 5.Flexible Configuration: Easy to tune for different operation types and requirements
- Graceful Degradation: System continues operating even when some components are in degraded states
- Cost Efficiency: Intelligent backoff and circuit breaking prevent resource waste
These patterns demonstrate enterprise-grade resilience engineering that prevents cascading failures, reduces operational costs, and provides the reliability needed for production AI systems handling thousands of requests daily.
Top comments (0)