DEV Community

Cover image for GraphBit's Production Resilience Engineering: Key Implementations and Cost-Efficient Strategies
Yeahia Sarker
Yeahia Sarker

Posted on

GraphBit's Production Resilience Engineering: Key Implementations and Cost-Efficient Strategies

GraphBit's Production Resilience Engineering: Concrete Code Examples

Based on GraphBit's actual codebase, here are the specific implementations that demonstrate how the "unglamorous but critical work" translates into production-grade resilience engineering:

1. Error Classification Implementation

GraphBit implements sophisticated error classification in core/src/types.rs and core/src/errors.rs:

/// Types of errors that can potentially be retried

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]

pub enum RetryableErrorType {

/// Network connectivity issues

NetworkError,

/// Request timeout errors

TimeoutError,

/// Rate limiting from external services

RateLimitError,

/// Temporary service unavailability

TemporaryUnavailable,

/// Internal server errors (5xx)

InternalServerError,

/// Authentication/authorization that might be temporary

AuthenticationError,

/// Resource conflicts that might resolve

ResourceConflict,

/// All other errors (use with caution)

Other,

}

impl RetryableErrorType {

/// Determine retry type from error

pub fn from_error(error: &crate::errors::GraphBitError) -> Self {

   `// Simple error type classification based on error message`

   `let error_str = error.to_string().to_lowercase();`
Enter fullscreen mode Exit fullscreen mode

/// Check if the error is retryable

pub fn is_retryable(&self) -> bool {

matches!(

   `self,`

   `GraphBitError::Network { .. }`

       `| GraphBitError::RateLimit { .. }`

       `| GraphBitError::LlmProvider { .. }`

       `| GraphBitError::Llm { .. }`
Enter fullscreen mode Exit fullscreen mode

)

}

/// Get retry delay in seconds for retryable errors

pub fn retry_delay(&self) -> Option<u64> {

match self {

   `GraphBitError::RateLimit {`

       `retry_after_seconds,`

       `..`

   `} => Some(*retry_after_seconds),`

   `GraphBitError::Network { .. } => Some(1),`

   `GraphBitError::LlmProvider { .. } => Some(2),`

   `GraphBitError::Llm { .. } => Some(1),`

   `_ => None,`
Enter fullscreen mode Exit fullscreen mode

}

}

2. Adaptive Retry Policies with Exponential Backoff + Jitter

The actual implementation in core/src/types.rs shows GraphBit's sophisticated retry logic:

/// Calculate delay for a given attempt with exponential backoff and jitter

pub fn calculate_delay(&self, attempt: u32) -> u64 {

if attempt == 0 {

   `return 0;`
Enter fullscreen mode Exit fullscreen mode

}

let base_delay = (self.initial_delay_ms as f64

   `* self.backoff_multiplier.powi(attempt as i32 - 1))`
Enter fullscreen mode Exit fullscreen mode

.min(self.max_delay_ms as f64);

// Add jitter to prevent thundering herd

let jitter = if self.jitter_factor > 0.0 {

   `let max_jitter = base_delay * self.jitter_factor;`

   `use rand::Rng;`

   `let mut rng = rand::thread_rng();`

   `rng.gen_range(-max_jitter..=max_jitter)`
Enter fullscreen mode Exit fullscreen mode

} else {

   `0.0`
Enter fullscreen mode Exit fullscreen mode

};

((base_delay + jitter).max(0.0) as u64).min(self.max_delay_ms)

}

Production Configuration Example:

impl Default for RetryConfig {

fn default() -> Self {

   `Self {`

       `max_attempts: 3,`

       `initial_delay_ms: 1000,`

       `backoff_multiplier: 2.0,`

       `max_delay_ms: DEFAULT_TIMEOUT_MS,`

       `jitter_factor: 0.1,  // 10% jitter prevents thundering herd`

       `retryable_errors: vec![`

           `RetryableErrorType::NetworkError,`

           `RetryableErrorType::TimeoutError,`

           `RetryableErrorType::TemporaryUnavailable,`

           `RetryableErrorType::InternalServerError,`

       `],`

   `}`
Enter fullscreen mode Exit fullscreen mode

}

}

3. Circuit Breaker State Management: Closed → Open → Half-Open

GraphBit implements the full circuit breaker pattern in python/src/llm/client.rs:

/// Circuit breaker state for resilience

#[derive(Debug, Clone)]

enum CircuitBreakerState {

Closed { failure_count: u32 },

Open { opened_at: Instant },

HalfOpen,

}

impl CircuitBreaker {

async fn can_execute(&self) -> bool {

   `if !self.config.circuit_breaker_enabled {`

       `return true;`

   `}`

   `let mut state = self.state.write().await;`

   `match *state {`

       `CircuitBreakerState::Closed { .. } => true,`

       `CircuitBreakerState::Open { opened_at } => {`

           `if opened_at.elapsed() > self.config.circuit_breaker_recovery_timeout {`

               `*state = CircuitBreakerState::HalfOpen;`

               `true`

           `} else {`

               `false`

           `}`

       `}`

       `CircuitBreakerState::HalfOpen => true,`

   `}`
Enter fullscreen mode Exit fullscreen mode

}

async fn record_failure(&self) {

let mut state = self.state.write().await;

match *state {

   `CircuitBreakerState::Closed { failure_count } => {`

       `let new_count = failure_count + 1;`

       `if new_count >= self.config.circuit_breaker_threshold {`

           `*state = CircuitBreakerState::Open {`

               `opened_at: Instant::now(),`

           `};`

           `if self.config.debug {`

               `warn!("Circuit breaker opened due to {} failures", new_count);`

           `}`

       `} else {`

           `*state = CircuitBreakerState::Closed {`

               `failure_count: new_count,`

           `};`

       `}`

   `}`

   `CircuitBreakerState::HalfOpen => {`

       `*state = CircuitBreakerState::Open {`

           `opened_at: Instant::now(),`

       `};`

       `if self.config.debug {`

           `warn!("Circuit breaker reopened after failed recovery attempt");`

       `}`

   `}`

   `_ => {}`
Enter fullscreen mode Exit fullscreen mode

}

}

4. Production Resilience Features: Cost Efficiency & Compute Waste Prevention

Production Client Configuration (python/src/llm/client.rs):

impl Default for ClientConfig {

fn default() -> Self {

   `Self {`

       `request_timeout: Duration::from_secs(120), // Increased to match Ollama's capabilities`

       `max_retries: 3,`

       `base_retry_delay: Duration::from_millis(100),`

       `max_retry_delay: Duration::from_secs(5),`

       `circuit_breaker_enabled: true,`

       `circuit_breaker_threshold: 5,`

       `circuit_breaker_recovery_timeout: Duration::from_secs(60),`

       `debug: false,`

   `}`
Enter fullscreen mode Exit fullscreen mode

}

}

Resilient Request Execution that prevents the 30-40% compute waste:

/// Execute a single request with retry logic and circuit breaker

async fn execute_request_with_resilience(

provider: Arc<RwLock<Box<dyn LlmProviderTrait>>>,

circuit_breaker: Arc<CircuitBreaker>,

stats: Arc<RwLock<ClientStats>>,

config: ClientConfig,

request: LlmRequest,

) -> Result<graphbit_core::llm::LlmResponse, PyErr> {

// Check circuit breaker

if !circuit_breaker.can_execute().await {

   `return Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(`

       `"Circuit breaker is open, request rejected",`

   `));`
Enter fullscreen mode Exit fullscreen mode

}

let mut last_error = None;

let mut delay = config.base_retry_delay;

for attempt in 0..=config.max_retries {

   `match timeout(config.request_timeout, provider_request).await {`

       `Ok(Ok(response)) => {`

           `circuit_breaker.record_success().await;`

           `return Ok(response);`

       `}`

       `Ok(Err(e)) => {`

           `circuit_breaker.record_failure().await;`

           `last_error = Some(to_py_error(e));`

       `}`

       `Err(_) => {`

           `// Timeout occurred`

           `circuit_breaker.record_failure().await;`

           `last_error = Some(timeout_error(`

               `"llm_request",`

               `config.request_timeout.as_millis() as u64,`

               `"Request timed out",`

           `));`

       `}`

   `}`

   `// Exponential backoff with jitter`

   `if attempt < config.max_retries {`

       `tokio::time::sleep(delay).await;`

       `delay = (delay * 2).min(config.max_retry_delay);`

   `}`
Enter fullscreen mode Exit fullscreen mode

}

5. Python API Usage Examples

Production Configuration (docs/api-reference/configuration.md):

def create_prod_config():

"""Configuration for production environment."""

# Initialize without debugging

init(debug=False, log_level="warn")

# High-quality model for production

config = LlmConfig.openai(

   `api_key=os.getenv("OPENAI_API_KEY"),`

   `model="gpt-4o-mini"`
Enter fullscreen mode Exit fullscreen mode

)

# High-throughput executor for production

executor = Executor.new_high_throughput(

   `config,`

   `timeout_seconds=300,`

   `debug=False`
Enter fullscreen mode Exit fullscreen mode

)

# Configure for production reliability

executor.configure(

   `timeout_seconds=300,`

   `max_retries=3,`

   `enable_metrics=True,`

   `debug=False`
Enter fullscreen mode Exit fullscreen mode

)

return executor

Circuit Breaker Usage (docs/user-guide/reliability.md):

def create_circuit_breaker_executor():

"""Create executor with circuit breaker protection."""

# Base executor

llm_config = LlmConfig.openai(

   `api_key=os.getenv("OPENAI_API_KEY"),`

   `model="gpt-4o-mini"`
Enter fullscreen mode Exit fullscreen mode

)

base_executor = Executor(llm_config)

# Circuit breaker configuration

circuit_breaker = CircuitBreaker(

   `failure_threshold=3,  # Open after 3 failures`

   `recovery_timeout=30,  # Try again after 30 seconds`

   `timeout_seconds=60    # Individual execution timeout`
Enter fullscreen mode Exit fullscreen mode

)

reliable_executor = ReliableExecutor(base_executor, circuit_breaker)

return reliable_executor

6. Cost Efficiency: Model Selection & Pricing

GraphBit includes actual cost tracking in core/src/llm/openai.rs:

fn cost_per_token(&self) -> Option<(f64, f64)> {

// Cost per token in USD (input, output) as of late 2023

match self.model.as_str() {

   `"gpt-4" => Some((0.00003, 0.00006)),`

   `"gpt-4-32k" => Some((0.00006, 0.00012)),`

   `"gpt-4-turbo" => Some((0.00001, 0.00003)),`

   `"gpt-4o" => Some((0.000005, 0.000015)),`

   `"gpt-4o-mini" => Some((0.00000015, 0.0000006)), // Most cost-effective`

   `"gpt-3.5-turbo" => Some((0.0000015, 0.000002)),`

   `"gpt-3.5-turbo-16k" => Some((0.000003, 0.000004)),`

   `_ => None,`
Enter fullscreen mode Exit fullscreen mode

}

}

7. Tunable Configuration Options

Developers can configure resilience behavior through multiple parameters:

/// Configure exponential backoff

pub fn with_exponential_backoff(

mut self,

initial_delay_ms: u64,

multiplier: f64,

max_delay_ms: u64,

) -> Self {

self.initial_delay_ms = initial_delay_ms;

self.backoff_multiplier = multiplier;

self.max_delay_ms = max_delay_ms;

self

}

/// Configure jitter factor

pub fn with_jitter(mut self, jitter_factor: f64) -> Self {

self.jitter_factor = jitter_factor.clamp(0.0, 1.0);

self

}

/// Configure which errors should trigger retries

pub fn with_retryable_errors(mut self, errors: Vec<RetryableErrorType>) -> Self {

self.retryable_errors = errors;

self

}

Key Business Impact Features

  1. Prevents 30-40% Compute Waste: The circuit breaker immediately stops requests when providers are down, preventing wasted API calls and costs.
  2. Intelligent Error Classification: Different error types get different retry strategies - rate limits get longer delays, network errors get immediate retries.
  3. Jitter Prevents Thundering Herd: Random jitter in retry delays prevents all clients from retrying simultaneously.
  4. Cost-Aware Model Selection: Built-in cost tracking helps developers choose the most cost-effective models (like gpt-4o-mini at $0.00000015 per input token).
  5. Production-Ready Defaults: Sensible defaults (3 retries, 10% jitter, 60s circuit breaker recovery) work out of the box for production workloads.

This implementation demonstrates how GraphBit's "unglamorous but critical work" translates into measurable business value: reduced costs, improved reliability, and predictable performance for production AI systems handling thousands of API calls daily.

For Posting Purpose can use below High-Level code examples :

Production-Grade Resilience Engineering: Architectural Patterns & Implementation Examples

Here are realistic, production-quality code implementations that demonstrate enterprise-grade resilience engineering patterns:

1. Circuit Breaker Pattern - Production Implementation

from

enum

import

Enum

from

dataclasses

import

dataclass

from

typing

import

Optional

,

Callable

,

Any

,

Dict

import

time

import

asyncio

import

logging

from

contextlib

import

asynccontextmanager

class

CircuitState

(

Enum

)

:

CLOSED

=

"closed"

OPEN

=

"open"

HALF_OPEN

=

"half_open"

@dataclass

class

CircuitBreakerConfig

:

failure_threshold

:

int

=

5

recovery_timeout_seconds

:

int

=

60

success_threshold

:

int

=

3

timeout_seconds

:

int

=

30

class

CircuitBreakerMetrics

:

def

__init__

(

self

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

total_requests

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

failed_requests

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

successful_requests

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

circuit_opens

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

last_failure_time

:

Optional

[

float

]

=

None

@property

def

failure_rate

(

self

)

-

>

float

:

if

self

.

total_requests

==

0

:

return

0.0

return

self

.

failed_requests

/

self

.

total_requests

class

ProductionCircuitBreaker

:

"""

Production-grade circuit breaker with comprehensive monitoring and logging.

Prevents cascading failures in distributed systems.

"""

def

__init__

(

self

,

name

:

str

,

config

:

CircuitBreakerConfig

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

name

=

name

   `self`
Enter fullscreen mode Exit fullscreen mode

.

config

=

config

   `self`
Enter fullscreen mode Exit fullscreen mode

.

state

=

CircuitState

.

CLOSED

   `self`
Enter fullscreen mode Exit fullscreen mode

.

failure_count

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

success_count

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

last_failure_time

:

Optional

[

float

]

=

None

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

=

CircuitBreakerMetrics

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

=

logging

.

getLogger

(

f"circuit_breaker.

{

name

}

"

)

def

can_execute

(

self

)

-

>

bool

:

"""Check if request should be allowed through the circuit breaker."""

   `current_time`
Enter fullscreen mode Exit fullscreen mode

=

time

.

time

(

)

if

self

.

state

==

CircuitState

.

CLOSED

:

return

True

elif

self

.

state

==

CircuitState

.

OPEN

:

if

(

self

.

last_failure_time

and

           `current_time`
Enter fullscreen mode Exit fullscreen mode

-

self

.

last_failure_time

>=

self

.

config

.

recovery_timeout_seconds

)

:

           `self`
Enter fullscreen mode Exit fullscreen mode

.

_transition_to_half_open

(

)

return

True

return

False

elif

self

.

state

==

CircuitState

.

HALF_OPEN

:

return

True

return

False

def

record_success

(

self

)

:

"""Record successful operation and update circuit state."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

successful_requests

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

total_requests

+=

1

if

self

.

state

==

CircuitState

.

HALF_OPEN

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

success_count

+=

1

if

self

.

success_count

>=

self

.

config

.

success_threshold

:

           `self`
Enter fullscreen mode Exit fullscreen mode

.

_transition_to_closed

(

)

elif

self

.

state

==

CircuitState

.

CLOSED

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

failure_count

=

0

def

record_failure

(

self

)

:

"""Record failed operation and update circuit state."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

failed_requests

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

total_requests

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

last_failure_time

=

time

.

time

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

last_failure_time

=

self

.

last_failure_time

if

self

.

state

==

CircuitState

.

CLOSED

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

failure_count

+=

1

if

self

.

failure_count

>=

self

.

config

.

failure_threshold

:

           `self`
Enter fullscreen mode Exit fullscreen mode

.

_transition_to_open

(

)

elif

self

.

state

==

CircuitState

.

HALF_OPEN

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

_transition_to_open

(

)

def

_transition_to_open

(

self

)

:

"""Transition circuit breaker to OPEN state."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

state

=

CircuitState

.

OPEN

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

circuit_opens

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

warning

(

f"Circuit breaker '

{

self

.

name

}

' opened after

{

self

.

failure_count

}

failures. "

f"Failure rate:

{

self

.

metrics

.

failure_rate

:

.2%

}

"

)

def

_transition_to_half_open

(

self

)

:

"""Transition circuit breaker to HALF_OPEN state."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

state

=

CircuitState

.

HALF_OPEN

   `self`
Enter fullscreen mode Exit fullscreen mode

.

success_count

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

info

(

f"Circuit breaker '

{

self

.

name

}

' transitioned to HALF_OPEN"

)

def

_transition_to_closed

(

self

)

:

"""Transition circuit breaker to CLOSED state."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

state

=

CircuitState

.

CLOSED

   `self`
Enter fullscreen mode Exit fullscreen mode

.

failure_count

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

success_count

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

info

(

f"Circuit breaker '

{

self

.

name

}

' closed after recovery"

)

def

get_metrics

(

self

)

-

>

Dict

[

str

,

Any

]

:

"""Get comprehensive circuit breaker metrics."""

return

{

"name"

:

self

.

name

,

"state"

:

self

.

state

.

value

,

"total_requests"

:

self

.

metrics

.

total_requests

,

"successful_requests"

:

self

.

metrics

.

successful_requests

,

"failed_requests"

:

self

.

metrics

.

failed_requests

,

"failure_rate"

:

self

.

metrics

.

failure_rate

,

"circuit_opens"

:

self

.

metrics

.

circuit_opens

,

"current_failure_count"

:

self

.

failure_count

,

"last_failure_time"

:

self

.

metrics

.

last_failure_time

}

@asynccontextmanager

async

def

circuit_breaker_context

(

circuit_breaker

:

ProductionCircuitBreaker

)

:

"""Context manager for circuit breaker execution."""

if

not

circuit_breaker

.

can_execute

(

)

:

raise

CircuitBreakerOpenError

(

f"Circuit breaker '

{

circuit_breaker

.

name

}

' is open"

)

try

:

yield

   `circuit_breaker`
Enter fullscreen mode Exit fullscreen mode

.

record_success

(

)

except

Exception

as

e

:

   `circuit_breaker`
Enter fullscreen mode Exit fullscreen mode

.

record_failure

(

)

raise

class

CircuitBreakerOpenError

(

Exception

)

:

"""Raised when circuit breaker is open and requests are rejected."""

pass

2. Adaptive Retry Logic with Exponential Backoff + Jitter

import

random

import

asyncio

from

typing

import

List

,

Type

,

Union

,

Callable

,

Any

from

dataclasses

import

dataclass

import

logging

@dataclass

class

RetryConfig

:

max_attempts

:

int

=

3

base_delay_seconds

:

float

=

1.0

max_delay_seconds

:

float

=

60.0

exponential_base

:

float

=

2.0

jitter_factor

:

float

=

0.1

retryable_exceptions

:

List

[

Type

[

Exception

]

]

=

None

def

__post_init__

(

self

)

:

if

self

.

retryable_exceptions

is

None

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

retryable_exceptions

=

[

           `ConnectionError`
Enter fullscreen mode Exit fullscreen mode

,

           `TimeoutError`
Enter fullscreen mode Exit fullscreen mode

,

           `IOError`
Enter fullscreen mode Exit fullscreen mode

,

# Add domain-specific exceptions

]

class

RetryMetrics

:

def

__init__

(

self

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

total_attempts

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

successful_operations

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

failed_operations

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_attempts

=

0

   `self`
Enter fullscreen mode Exit fullscreen mode

.

average_attempts_per_operation

=

0.0

def

record_attempt

(

self

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

total_attempts

+=

1

def

record_success

(

self

,

attempts_used

:

int

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

successful_operations

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_attempts

+=

(

attempts_used

-

1

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

_update_average

(

)

def

record_failure

(

self

,

attempts_used

:

int

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

failed_operations

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_attempts

+=

(

attempts_used

-

1

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

_update_average

(

)

def

_update_average

(

self

)

:

   `total_ops`
Enter fullscreen mode Exit fullscreen mode

=

self

.

successful_operations

+

self

.

failed_operations

if

total_ops

>

0

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

average_attempts_per_operation

=

self

.

total_attempts

/

total_ops

class

ProductionRetryHandler

:

"""

Production-grade retry handler with intelligent backoff strategies.

Prevents thundering herd problems and provides comprehensive monitoring.

"""

def

__init__

(

self

,

name

:

str

,

config

:

RetryConfig

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

name

=

name

   `self`
Enter fullscreen mode Exit fullscreen mode

.

config

=

config

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

=

RetryMetrics

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

=

logging

.

getLogger

(

f"retry_handler.

{

name

}

"

)

def

calculate_delay

(

self

,

attempt

:

int

)

-

>

float

:

"""Calculate delay with exponential backoff and jitter."""

if

attempt

<=

1

:

return

0

# Exponential backoff

   `delay`
Enter fullscreen mode Exit fullscreen mode

=

min

(

       `self`
Enter fullscreen mode Exit fullscreen mode

.

config

.

base_delay_seconds

*

(

self

.

config

.

exponential_base

**

(

attempt

-

2

)

)

,

       `self`
Enter fullscreen mode Exit fullscreen mode

.

config

.

max_delay_seconds

)

# Add jitter to prevent thundering herd

if

self

.

config

.

jitter_factor

>

0

:

       `jitter_range`
Enter fullscreen mode Exit fullscreen mode

=

delay

*

self

.

config

.

jitter_factor

       `jitter`
Enter fullscreen mode Exit fullscreen mode

=

random

.

uniform

(

-

jitter_range

,

jitter_range

)

       `delay`
Enter fullscreen mode Exit fullscreen mode

=

max

(

0

,

delay

+

jitter

)

return

delay

def

is_retryable

(

self

,

exception

:

Exception

)

-

>

bool

:

"""Determine if an exception should trigger a retry."""

return

any

(

isinstance

(

exception

,

exc_type

)

for

exc_type

in

self

.

config

.

retryable_exceptions

)

async

def

execute_with_retry

(

self

,

operation

:

Callable

,

*

args

,

**

kwargs

)

-

>

Any

:

"""Execute operation with retry logic."""

   `last_exception`
Enter fullscreen mode Exit fullscreen mode

=

None

for

attempt

in

range

(

1

,

self

.

config

.

max_attempts

+

1

)

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

record_attempt

(

)

try

:

           `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

debug

(

f"Executing

{

self

.

name

}

(attempt

{

attempt

}

)"

)

           `result`
Enter fullscreen mode Exit fullscreen mode

=

await

self

.

_execute_operation

(

operation

,

*

args

,

**

kwargs

)

           `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

record_success

(

attempt

)

if

attempt

>

1

:

               `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

info

(

f"Operation

{

self

.

name

}

succeeded on attempt

{

attempt

}

"

)

return

result

except

Exception

as

e

:

           `last_exception`
Enter fullscreen mode Exit fullscreen mode

=

e

if

not

self

.

is_retryable

(

e

)

:

               `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

error

(

f"Non-retryable error in

{

self

.

name

}

:

{

e

}

"

)

               `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

record_failure

(

attempt

)

raise

if

attempt

==

self

.

config

.

max_attempts

:

               `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

error

(

f"Operation

{

self

.

name

}

failed after

{

attempt

}

attempts:

{

e

}

"

)

               `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

record_failure

(

attempt

)

raise

           `delay`
Enter fullscreen mode Exit fullscreen mode

=

self

.

calculate_delay

(

attempt

)

           `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

warning

(

f"Attempt

{

attempt

}

failed for

{

self

.

name

}

:

{

e

}

. "

f"Retrying in

{

delay

:

.2f

}

s"

)

if

delay

>

0

:

await

asyncio

.

sleep

(

delay

)

# This should never be reached, but included for completeness

raise

last_exception

async

def

_execute_operation

(

self

,

operation

:

Callable

,

*

args

,

**

kwargs

)

-

>

Any

:

"""Execute the operation, handling both sync and async callables."""

if

asyncio

.

iscoroutinefunction

(

operation

)

:

return

await

operation

(

*

args

,

**

kwargs

)

else

:

return

operation

(

*

args

,

**

kwargs

)

def

get_metrics

(

self

)

-

>

Dict

[

str

,

Any

]

:

"""Get comprehensive retry metrics."""

return

{

"name"

:

self

.

name

,

"total_attempts"

:

self

.

metrics

.

total_attempts

,

"successful_operations"

:

self

.

metrics

.

successful_operations

,

"failed_operations"

:

self

.

metrics

.

failed_operations

,

"retry_attempts"

:

self

.

metrics

.

retry_attempts

,

"average_attempts_per_operation"

:

self

.

metrics

.

average_attempts_per_operation

,

"success_rate"

:

(

           `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

.

successful_operations

/

max

(

1

,

self

.

metrics

.

successful_operations

+

self

.

metrics

.

failed_operations

)

)

}

# Decorator for easy retry application

def

with_retry

(

config

:

RetryConfig

,

name

:

str

=

None

)

:

"""Decorator to add retry logic to functions."""

def

decorator

(

func

)

:

   `handler_name`
Enter fullscreen mode Exit fullscreen mode

=

name

or

f"

{

func

.

__module__

}

.

{

func

.

__name__

}

"

   `retry_handler`
Enter fullscreen mode Exit fullscreen mode

=

ProductionRetryHandler

(

handler_name

,

config

)

async

def

async_wrapper

(

*

args

,

**

kwargs

)

:

return

await

retry_handler

.

execute_with_retry

(

func

,

*

args

,

**

kwargs

)

def

sync_wrapper

(

*

args

,

**

kwargs

)

:

return

asyncio

.

run

(

           `retry_handler`
Enter fullscreen mode Exit fullscreen mode

.

execute_with_retry

(

func

,

*

args

,

**

kwargs

)

)

if

asyncio

.

iscoroutinefunction

(

func

)

:

return

async_wrapper

else

:

return

sync_wrapper

return

decorator

3. Intelligent Error Classification System

from

enum

import

Enum

from

typing

import

Dict

,

List

,

Optional

,

Type

,

Callable

from

dataclasses

import

dataclass

import

re

import

logging

class

ErrorSeverity

(

Enum

)

:

LOW

=

"low"

MEDIUM

=

"medium"

HIGH

=

"high"

CRITICAL

=

"critical"

class

ErrorCategory

(

Enum

)

:

NETWORK

=

"network"

AUTHENTICATION

=

"authentication"

AUTHORIZATION

=

"authorization"

RATE_LIMIT

=

"rate_limit"

TIMEOUT

=

"timeout"

VALIDATION

=

"validation"

RESOURCE_EXHAUSTION

=

"resource_exhaustion"

SYSTEM

=

"system"

UNKNOWN

=

"unknown"

@dataclass

class

ErrorClassification

:

category

:

ErrorCategory

severity

:

ErrorSeverity

is_retryable

:

bool

suggested_delay

:

float

max_retries

:

int

escalation_required

:

bool

=

False

class

ErrorPattern

:

def

__init__

(

self

,

            `pattern`
Enter fullscreen mode Exit fullscreen mode

:

str

,

            `classification`
Enter fullscreen mode Exit fullscreen mode

:

ErrorClassification

,

            `exception_types`
Enter fullscreen mode Exit fullscreen mode

:

List

[

Type

[

Exception

]

]

=

None

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

pattern

=

re

.

compile

(

pattern

,

re

.

IGNORECASE

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

classification

=

classification

   `self`
Enter fullscreen mode Exit fullscreen mode

.

exception_types

=

exception_types

or

[

]

class

ProductionErrorClassifier

:

"""

Production-grade error classification system for intelligent error handling.

Provides context-aware error categorization and response strategies.

"""

def

__init__

(

self

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

patterns

:

List

[

ErrorPattern

]

=

[

]

   `self`
Enter fullscreen mode Exit fullscreen mode

.

classification_cache

:

Dict

[

str

,

ErrorClassification

]

=

{

}

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

=

{

"classifications"

:

0

,

"cache_hits"

:

0

,

"category_counts"

:

{

category

.

value

:

0

for

category

in

ErrorCategory

}

}

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

=

logging

.

getLogger

(

"error_classifier"

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

_initialize_default_patterns

(

)

def

_initialize_default_patterns

(

self

)

:

"""Initialize default error classification patterns."""

   `patterns`
Enter fullscreen mode Exit fullscreen mode

=

[

# Network errors

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"connection.*(?:refused|reset|timeout|failed)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

NETWORK

,

ErrorSeverity

.

MEDIUM

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

2.0

,

max_retries

=

3

)

,

[

ConnectionError

,

OSError

]

)

,

# Authentication errors

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"(?:auth|unauthorized|invalid.*key|forbidden)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

AUTHENTICATION

,

ErrorSeverity

.

HIGH

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

False

,

suggested_delay

=

0

,

max_retries

=

0

,

               `escalation_required`
Enter fullscreen mode Exit fullscreen mode

=

True

)

,

[

PermissionError

]

)

,

# Rate limiting

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"(?:rate.*limit|quota.*exceeded|throttle|too.*many.*requests)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

RATE_LIMIT

,

ErrorSeverity

.

MEDIUM

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

60.0

,

max_retries

=

5

)

)

,

# Timeout errors

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"(?:timeout|timed.*out|deadline.*exceeded)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

TIMEOUT

,

ErrorSeverity

.

MEDIUM

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

5.0

,

max_retries

=

3

)

,

[

TimeoutError

]

)

,

# Resource exhaustion

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"(?:memory|resource|capacity|limit.*exceeded|out.*of.*space)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

RESOURCE_EXHAUSTION

,

ErrorSeverity

.

HIGH

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

30.0

,

max_retries

=

2

,

               `escalation_required`
Enter fullscreen mode Exit fullscreen mode

=

True

)

,

[

MemoryError

]

)

,

# Validation errors

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"(?:invalid.*input|validation.*failed|bad.*request|malformed)"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

VALIDATION

,

ErrorSeverity

.

LOW

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

False

,

suggested_delay

=

0

,

max_retries

=

0

)

,

[

ValueError

]

)

]

   `self`
Enter fullscreen mode Exit fullscreen mode

.

patterns

.

extend

(

patterns

)

def

classify_error

(

self

,

                 `exception`
Enter fullscreen mode Exit fullscreen mode

:

Exception

,

                 `context`
Enter fullscreen mode Exit fullscreen mode

:

Optional

[

Dict

[

str

,

Any

]

]

=

None

)

-

>

ErrorClassification

:

"""Classify an error and return appropriate handling strategy."""

   `error_key`
Enter fullscreen mode Exit fullscreen mode

=

f"

{

type

(

exception

)

.

__name__

}

:

{

str

(

exception

)

}

"

# Check cache first

if

error_key

in

self

.

classification_cache

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"cache_hits"

]

+=

1

return

self

.

classification_cache

[

error_key

]

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"classifications"

]

+=

1

# Try exception type matching first

for

pattern

in

self

.

patterns

:

if

any

(

isinstance

(

exception

,

exc_type

)

for

exc_type

in

pattern

.

exception_types

)

:

           `classification`
Enter fullscreen mode Exit fullscreen mode

=

pattern

.

classification

break

else

:

# Fall back to message pattern matching

       `error_message`
Enter fullscreen mode Exit fullscreen mode

=

str

(

exception

)

       `classification`
Enter fullscreen mode Exit fullscreen mode

=

None

for

pattern

in

self

.

patterns

:

if

pattern

.

pattern

.

search

(

error_message

)

:

               `classification`
Enter fullscreen mode Exit fullscreen mode

=

pattern

.

classification

break

if

classification

is

None

:

           `classification`
Enter fullscreen mode Exit fullscreen mode

=

ErrorClassification

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

UNKNOWN

,

ErrorSeverity

.

MEDIUM

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

1.0

,

max_retries

=

1

)

# Apply context-based adjustments

if

context

:

       `classification`
Enter fullscreen mode Exit fullscreen mode

=

self

.

_apply_context_adjustments

(

classification

,

context

)

# Cache the result

   `self`
Enter fullscreen mode Exit fullscreen mode

.

classification_cache

[

error_key

]

=

classification

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"category_counts"

]

[

classification

.

category

.

value

]

+=

1

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

debug

(

f"Classified error as

{

classification

.

category

.

value

}

"

f"(severity:

{

classification

.

severity

.

value

}

, retryable:

{

classification

.

is_retryable

}

)"

)

return

classification

def

_apply_context_adjustments

(

self

,

                            `classification`
Enter fullscreen mode Exit fullscreen mode

:

ErrorClassification

,

                            `context`
Enter fullscreen mode Exit fullscreen mode

:

Dict

[

str

,

Any

]

)

-

>

ErrorClassification

:

"""Apply context-specific adjustments to error classification."""

# Example context-based adjustments

if

context

.

get

(

"is_critical_operation"

,

False

)

:

if

classification

.

severity

==

ErrorSeverity

.

MEDIUM

:

           `classification`
Enter fullscreen mode Exit fullscreen mode

.

severity

=

ErrorSeverity

.

HIGH

           `classification`
Enter fullscreen mode Exit fullscreen mode

.

escalation_required

=

True

if

context

.

get

(

"retry_count"

,

0

)

>

2

:

       `classification`
Enter fullscreen mode Exit fullscreen mode

.

suggested_delay

*=

2

# Increase delay for repeated failures

return

classification

def

add_custom_pattern

(

self

,

pattern

:

ErrorPattern

)

:

"""Add custom error pattern for domain-specific errors."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

patterns

.

insert

(

0

,

pattern

)

# Insert at beginning for priority

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

info

(

f"Added custom error pattern:

{

pattern

.

pattern

.

pattern

}

"

)

def

get_metrics

(

self

)

-

>

Dict

[

str

,

Any

]

:

"""Get error classification metrics."""

return

{

"total_classifications"

:

self

.

metrics

[

"classifications"

]

,

"cache_hits"

:

self

.

metrics

[

"cache_hits"

]

,

"cache_hit_rate"

:

(

           `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"cache_hits"

]

/

max

(

1

,

self

.

metrics

[

"classifications"

]

)

)

,

"category_distribution"

:

self

.

metrics

[

"category_counts"

]

,

"patterns_loaded"

:

len

(

self

.

patterns

)

}

class

ErrorHandler

:

"""High-level error handler that integrates classification with response strategies."""

def

__init__

(

self

,

classifier

:

ProductionErrorClassifier

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

classifier

=

classifier

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

=

logging

.

getLogger

(

"error_handler"

)

async

def

handle_error

(

self

,

                     `exception`
Enter fullscreen mode Exit fullscreen mode

:

Exception

,

                     `context`
Enter fullscreen mode Exit fullscreen mode

:

Optional

[

Dict

[

str

,

Any

]

]

=

None

)

-

>

bool

:

"""

   `Handle an error based on its classification.`

   `Returns True if the operation should be retried, False otherwise.`

   `"""`

   `classification`
Enter fullscreen mode Exit fullscreen mode

=

self

.

classifier

.

classify_error

(

exception

,

context

)

# Log the error with appropriate level

   `log_message`
Enter fullscreen mode Exit fullscreen mode

=

f"Handling

{

classification

.

category

.

value

}

error:

{

exception

}

"

if

classification

.

severity

==

ErrorSeverity

.

CRITICAL

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

critical

(

log_message

)

elif

classification

.

severity

==

ErrorSeverity

.

HIGH

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

error

(

log_message

)

elif

classification

.

severity

==

ErrorSeverity

.

MEDIUM

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

warning

(

log_message

)

else

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

info

(

log_message

)

# Trigger escalation if required

if

classification

.

escalation_required

:

await

self

.

_escalate_error

(

exception

,

classification

,

context

)

return

classification

.

is_retryable

async

def

_escalate_error

(

self

,

                       `exception`
Enter fullscreen mode Exit fullscreen mode

:

Exception

,

                       `classification`
Enter fullscreen mode Exit fullscreen mode

:

ErrorClassification

,

                       `context`
Enter fullscreen mode Exit fullscreen mode

:

Optional

[

Dict

[

str

,

Any

]

]

)

:

"""Escalate critical errors to monitoring/alerting systems."""

   `escalation_data`
Enter fullscreen mode Exit fullscreen mode

=

{

"error"

:

str

(

exception

)

,

"category"

:

classification

.

category

.

value

,

"severity"

:

classification

.

severity

.

value

,

"context"

:

context

or

{

}

,

"timestamp"

:

time

.

time

(

)

}

# In production, this would integrate with your alerting system

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

critical

(

f"ESCALATION REQUIRED:

{

escalation_data

}

"

)

4. Integrated Resilience Framework

import

asyncio

from

typing

import

Any

,

Dict

,

Optional

,

Callable

from

dataclasses

import

dataclass

import

logging

import

time

@dataclass

class

ResilienceConfig

:

circuit_breaker

:

CircuitBreakerConfig

retry

:

RetryConfig

timeout_seconds

:

float

=

30.0

enable_metrics

:

bool

=

True

enable_logging

:

bool

=

True

class

ProductionResilienceFramework

:

"""

Integrated resilience framework combining circuit breaker, retry logic,

error classification, and comprehensive monitoring.

"""

def

__init__

(

self

,

name

:

str

,

config

:

ResilienceConfig

)

:

   `self`
Enter fullscreen mode Exit fullscreen mode

.

name

=

name

   `self`
Enter fullscreen mode Exit fullscreen mode

.

config

=

config

# Initialize components

   `self`
Enter fullscreen mode Exit fullscreen mode

.

circuit_breaker

=

ProductionCircuitBreaker

(

f"

{

name

}

_circuit"

,

config

.

circuit_breaker

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_handler

=

ProductionRetryHandler

(

f"

{

name

}

_retry"

,

config

.

retry

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

error_classifier

=

ProductionErrorClassifier

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

error_handler

=

ErrorHandler

(

self

.

error_classifier

)

# Metrics and logging

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

=

{

"total_operations"

:

0

,

"successful_operations"

:

0

,

"failed_operations"

:

0

,

"circuit_breaker_rejections"

:

0

,

"average_response_time_ms"

:

0.0

,

"uptime_start"

:

time

.

time

(

)

}

   `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

=

logging

.

getLogger

(

f"resilience.

{

name

}

"

)

async

def

execute

(

self

,

                `operation`
Enter fullscreen mode Exit fullscreen mode

:

Callable

,

*

args

,

                `context`
Enter fullscreen mode Exit fullscreen mode

:

Optional

[

Dict

[

str

,

Any

]

]

=

None

,

**

kwargs

)

-

>

Any

:

"""

   `Execute operation with full resilience protection.`

   `"""`

   `start_time`
Enter fullscreen mode Exit fullscreen mode

=

time

.

time

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"total_operations"

]

+=

1

try

:

# Circuit breaker check

async

with

circuit_breaker_context

(

self

.

circuit_breaker

)

:

# Execute with timeout and retry

           `result`
Enter fullscreen mode Exit fullscreen mode

=

await

asyncio

.

wait_for

(

               `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_handler

.

execute_with_retry

(

                   `operation`
Enter fullscreen mode Exit fullscreen mode

,

*

args

,

**

kwargs

)

,

               `timeout`
Enter fullscreen mode Exit fullscreen mode

=

self

.

config

.

timeout_seconds

)

# Record success metrics

           `duration_ms`
Enter fullscreen mode Exit fullscreen mode

=

(

time

.

time

(

)

-

start_time

)

*

1000

           `self`
Enter fullscreen mode Exit fullscreen mode

.

_update_success_metrics

(

duration_ms

)

return

result

except

CircuitBreakerOpenError

:

       `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"circuit_breaker_rejections"

]

+=

1

       `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

warning

(

f"Operation rejected by circuit breaker:

{

self

.

name

}

"

)

raise

except

Exception

as

e

:

# Classify and handle the error

       `should_retry`
Enter fullscreen mode Exit fullscreen mode

=

await

self

.

error_handler

.

handle_error

(

e

,

context

)

       `self`
Enter fullscreen mode Exit fullscreen mode

.

_update_failure_metrics

(

)

if

not

should_retry

:

           `self`
Enter fullscreen mode Exit fullscreen mode

.

logger

.

error

(

f"Non-retryable error in

{

self

.

name

}

:

{

e

}

"

)

raise

def

_update_success_metrics

(

self

,

duration_ms

:

float

)

:

"""Update success metrics with response time tracking."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"successful_operations"

]

+=

1

# Update average response time (exponential moving average)

   `alpha`
Enter fullscreen mode Exit fullscreen mode

=

0.1

# Smoothing factor

   `current_avg`
Enter fullscreen mode Exit fullscreen mode

=

self

.

metrics

[

"average_response_time_ms"

]

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"average_response_time_ms"

]

=

(

       `alpha`
Enter fullscreen mode Exit fullscreen mode

*

duration_ms

+

(

1

-

alpha

)

*

current_avg

)

def

_update_failure_metrics

(

self

)

:

"""Update failure metrics."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

[

"failed_operations"

]

+=

1

def

get_health_status

(

self

)

-

>

Dict

[

str

,

Any

]

:

"""Get comprehensive health status of the resilience framework."""

   `uptime_seconds`
Enter fullscreen mode Exit fullscreen mode

=

time

.

time

(

)

-

self

.

metrics

[

"uptime_start"

]

return

{

"name"

:

self

.

name

,

"status"

:

"healthy"

if

self

.

circuit_breaker

.

state

==

CircuitState

.

CLOSED

else

"degraded"

,

"uptime_seconds"

:

uptime_seconds

,

"metrics"

:

{

**

self

.

metrics

,

"success_rate"

:

self

.

_calculate_success_rate

(

)

,

"circuit_breaker"

:

self

.

circuit_breaker

.

get_metrics

(

)

,

"retry_handler"

:

self

.

retry_handler

.

get_metrics

(

)

,

"error_classifier"

:

self

.

error_classifier

.

get_metrics

(

)

}

}

def

_calculate_success_rate

(

self

)

-

>

float

:

"""Calculate overall success rate."""

   `total_ops`
Enter fullscreen mode Exit fullscreen mode

=

self

.

metrics

[

"successful_operations"

]

+

self

.

metrics

[

"failed_operations"

]

if

total_ops

==

0

:

return

1.0

return

self

.

metrics

[

"successful_operations"

]

/

total_ops

def

reset_metrics

(

self

)

:

"""Reset all metrics (useful for testing or periodic resets)."""

   `self`
Enter fullscreen mode Exit fullscreen mode

.

metrics

=

{

"total_operations"

:

0

,

"successful_operations"

:

0

,

"failed_operations"

:

0

,

"circuit_breaker_rejections"

:

0

,

"average_response_time_ms"

:

0.0

,

"uptime_start"

:

time

.

time

(

)

}

# Reset component metrics

   `self`
Enter fullscreen mode Exit fullscreen mode

.

circuit_breaker

.

metrics

=

CircuitBreakerMetrics

(

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

retry_handler

.

metrics

=

RetryMetrics

(

)

# Factory function for easy framework creation

def

create_resilience_framework

(

name

:

str

,

                         `failure_threshold`
Enter fullscreen mode Exit fullscreen mode

:

int

=

5

,

                         `max_retries`
Enter fullscreen mode Exit fullscreen mode

:

int

=

3

,

                         `timeout_seconds`
Enter fullscreen mode Exit fullscreen mode

:

float

=

30.0

)

-

>

ProductionResilienceFramework

:

"""Create a resilience framework with sensible defaults."""

config

=

ResilienceConfig

(

   `circuit_breaker`
Enter fullscreen mode Exit fullscreen mode

=

CircuitBreakerConfig

(

failure_threshold

=

failure_threshold

)

,

   `retry`
Enter fullscreen mode Exit fullscreen mode

=

RetryConfig

(

max_attempts

=

max_retries

)

,

   `timeout_seconds`
Enter fullscreen mode Exit fullscreen mode

=

timeout_seconds

)

return

ProductionResilienceFramework

(

name

,

config

)

5. Usage Examples - Production Integration

import

asyncio

import

aiohttp

from

typing

import

Dict

,

Any

# Example: HTTP Client with Resilience

class

ResilientHttpClient

:

"""Production HTTP client with comprehensive resilience patterns."""

def

__init__

(

self

)

:

# Configure resilience for different operation types

   `self`
Enter fullscreen mode Exit fullscreen mode

.

api_resilience

=

create_resilience_framework

(

"api_calls"

,

       `failure_threshold`
Enter fullscreen mode Exit fullscreen mode

=

3

,

       `max_retries`
Enter fullscreen mode Exit fullscreen mode

=

3

,

       `timeout_seconds`
Enter fullscreen mode Exit fullscreen mode

=

30.0

)

   `self`
Enter fullscreen mode Exit fullscreen mode

.

batch_resilience

=

create_resilience_framework

(

"batch_operations"

,

       `failure_threshold`
Enter fullscreen mode Exit fullscreen mode

=

5

,

       `max_retries`
Enter fullscreen mode Exit fullscreen mode

=

2

,

       `timeout_seconds`
Enter fullscreen mode Exit fullscreen mode

=

120.0

)

# Add custom error patterns for HTTP-specific errors

   `self`
Enter fullscreen mode Exit fullscreen mode

.

api_resilience

.

error_classifier

.

add_custom_pattern

(

       `ErrorPattern`
Enter fullscreen mode Exit fullscreen mode

(

r"5\d{2}.*server.*error"

,

           `ErrorClassification`
Enter fullscreen mode Exit fullscreen mode

(

               `ErrorCategory`
Enter fullscreen mode Exit fullscreen mode

.

SYSTEM

,

ErrorSeverity

.

HIGH

,

               `is_retryable`
Enter fullscreen mode Exit fullscreen mode

=

True

,

suggested_delay

=

10.0

,

max_retries

=

3

)

)

)

async

def

make_api_call

(

self

,

url

:

str

,

**

kwargs

)

-

>

Dict

[

str

,

Any

]

:

"""Make API call with full resilience protection."""

async

def

_api_operation

(

)

:

async

with

aiohttp

.

ClientSession

(

)

as

session

:

async

with

session

.

get

(

url

,

**

kwargs

)

as

response

:

               `response`
Enter fullscreen mode Exit fullscreen mode

.

raise_for_status

(

)

return

await

response

.

json

(

)

   `context`
Enter fullscreen mode Exit fullscreen mode

=

{

"url"

:

url

,

"is_critical_operation"

:

kwargs

.

get

(

"critical"

,

False

)

}

return

await

self

.

api_resilience

.

execute

(

       `_api_operation`
Enter fullscreen mode Exit fullscreen mode

,

       `context`
Enter fullscreen mode Exit fullscreen mode

=

context

)

async

def

batch_process

(

self

,

items

:

list

,

processor_func

:

Callable

)

-

>

list

:

"""Process items in batch with resilience."""

async

def

_batch_operation

(

)

:

       `results`
Enter fullscreen mode Exit fullscreen mode

=

[

]

for

item

in

items

:

           `result`
Enter fullscreen mode Exit fullscreen mode

=

await

processor_func

(

item

)

           `results`
Enter fullscreen mode Exit fullscreen mode

.

append

(

result

)

return

results

return

await

self

.

batch_resilience

.

execute

(

_batch_operation

)

def

get_health_metrics

(

self

)

-

>

Dict

[

str

,

Any

]

:

"""Get comprehensive health metrics for monitoring."""

return

{

"api_operations"

:

self

.

api_resilience

.

get_health_status

(

)

,

"batch_operations"

:

self

.

batch_resilience

.

get_health_status

(

)

,

"overall_status"

:

self

.

_calculate_overall_health

(

)

}

def

_calculate_overall_health

(

self

)

-

>

str

:

"""Calculate overall system health status."""

   `api_healthy`
Enter fullscreen mode Exit fullscreen mode

=

self

.

api_resilience

.

circuit_breaker

.

state

==

CircuitState

.

CLOSED

   `batch_healthy`
Enter fullscreen mode Exit fullscreen mode

=

self

.

batch_resilience

.

circuit_breaker

.

state

==

CircuitState

.

CLOSED

if

api_healthy

and

batch_healthy

:

return

"healthy"

elif

api_healthy

or

batch_healthy

:

return

"degraded"

else

:

return

"unhealthy"

# Example usage

async

def

main

(

)

:

client

=

ResilientHttpClient

(

)

try

:

# Make resilient API calls

   `result`
Enter fullscreen mode Exit fullscreen mode

=

await

client

.

make_api_call

(

"https://api.example.com/data"

,

       `critical`
Enter fullscreen mode Exit fullscreen mode

=

True

)

print

(

f"API Result:

{

result

}

"

)

# Process batch with resilience

   `items`
Enter fullscreen mode Exit fullscreen mode

=

[

1

,

2

,

3

,

4

,

5

]

async

def

process_item

(

item

)

:

# Simulate processing that might fail

if

item

==

3

:

raise

ConnectionError

(

"Simulated network error"

)

return

item

*

2

   `batch_results`
Enter fullscreen mode Exit fullscreen mode

=

await

client

.

batch_process

(

items

,

process_item

)

print

(

f"Batch Results:

{

batch_results

}

"

)

except

Exception

as

e

:

print

(

f"Operation failed:

{

e

}

"

)

# Get health metrics for monitoring

health

=

client

.

get_health_metrics

(

)

print

(

f"System Health:

{

health

}

"

)

if

__name__

==

"__main__"

:

asyncio

.

run

(

main

(

)

)

Key Architectural Benefits

  1. Separation of Concerns: Each component (circuit breaker, retry, error classification) is independently configurable and testable
  2. Comprehensive Monitoring: Built-in metrics collection for observability and alerting
  3. Context-Aware Decisions: Error handling adapts based on operation context and history
  4. Production-Ready Logging: Structured logging with appropriate severity levels 5.Flexible Configuration: Easy to tune for different operation types and requirements
  5. Graceful Degradation: System continues operating even when some components are in degraded states
  6. Cost Efficiency: Intelligent backoff and circuit breaking prevent resource waste

These patterns demonstrate enterprise-grade resilience engineering that prevents cascading failures, reduces operational costs, and provides the reliability needed for production AI systems handling thousands of requests daily.

Top comments (0)