Hey Dev Community!
Welcome to the deepest journey of Distributed Systems ever exists!
๐ Introduction: Why Time is the Hardest Problem
Welcome back, distributed systems architects! โก
If you've ever tried to debug a race condition, wondered why your database timestamps don't match, or struggled with replica synchronization, you've encountered the fundamental challenge of time in distributed systems.
Today, we're diving deep into the theoretical foundations of time, ordering, and coordination. This isn't just about clocksโit's about causality, consistency, and the very nature of distributed computation.
๐ข 1. Logical Clocks: Lamport & Vector Clocks
Lamport Clocks: The Foundation of Logical Time
Leslie Lamport's seminal 1978 paper introduced the idea that we don't need real-time clocksโwe just need to agree on happens-before relationships.
class LamportClock:
def __init__(self, node_id):
self.node_id = node_id
self.counter = 0
def increment(self):
"""Increment counter for local event"""
self.counter += 1
return self.counter
def send_event(self):
"""Increment and return timestamp for sending message"""
self.counter += 1
return self.counter
def receive_event(self, received_timestamp):
"""Update clock based on received message timestamp"""
self.counter = max(self.counter, received_timestamp) + 1
return self.counter
def compare(self, timestamp_a, timestamp_b):
"""
Compare two Lamport timestamps
Returns: -1 if a < b, 0 if concurrent, 1 if a > b
"""
if timestamp_a < timestamp_b:
return -1
elif timestamp_a > timestamp_b:
return 1
else:
# Same timestamp, use node_id as tiebreaker
return 0 if self.node_id == self.node_id else 1
def happens_before(self, event_a, event_b):
"""
Check if event_a happens before event_b
Based on: a โ b if:
1. a and b are on same process and a comes before b
2. a is send and b is receive of same message
3. Transitive: โ c such that a โ c and c โ b
"""
# Implementation of the happens-before relation
pass
# Example: Distributed chat application
class DistributedChat:
def __init__(self, user_id):
self.clock = LamportClock(user_id)
self.messages = []
def send_message(self, content):
timestamp = self.clock.send_event()
message = {
'content': content,
'sender': self.clock.node_id,
'timestamp': timestamp,
'lamport_time': timestamp
}
self.messages.append(message)
return message
def receive_message(self, message):
# Update clock with received timestamp
self.clock.receive_event(message['lamport_time'])
# Insert in happens-before order
self._insert_ordered(message)
def _insert_ordered(self, message):
# Insert message maintaining causal order
for i, existing in enumerate(self.messages):
if self.clock.compare(message['lamport_time'],
existing['lamport_time']) < 0:
self.messages.insert(i, message)
return
self.messages.append(message)
Key Insight: Lamport clocks guarantee that if event A happens before event B, then A's timestamp < B's timestamp. But the converse isn't trueโequal timestamps don't guarantee concurrency detection.
Vector Clocks: Capturing Causality Precisely
Vector clocks extend Lamport clocks to detect concurrent events.
class VectorClock:
def __init__(self, node_id, num_nodes):
self.node_id = node_id
self.num_nodes = num_nodes
self.vector = [0] * num_nodes
def increment(self):
"""Increment own component"""
self.vector[self.node_id] += 1
return self.vector.copy()
def update(self, received_vector):
"""Merge with received vector clock (element-wise max)"""
for i in range(self.num_nodes):
self.vector[i] = max(self.vector[i], received_vector[i])
self.vector[self.node_id] += 1 # Increment for receive event
def compare(self, other_vector):
"""
Compare two vector clocks
Returns:
-1 if self < other (happens before)
0 if concurrent
1 if self > other (happens after)
2 if equal
"""
less = False
greater = False
for i in range(self.num_nodes):
if self.vector[i] < other_vector[i]:
less = True
elif self.vector[i] > other_vector[i]:
greater = True
if less and not greater:
return -1 # self happens before other
elif greater and not less:
return 1 # self happens after other
elif not less and not greater:
return 2 # equal
else:
return 0 # concurrent
def is_causally_related(self, event_a, event_b):
"""Check if event_a causally precedes event_b"""
return self.compare(event_a['vector'], event_b['vector']) == -1
def get_concurrent_events(self, events):
"""Find all events concurrent with given event"""
concurrent = []
for event in events:
if self.compare(event['vector'], self.vector) == 0:
concurrent.append(event)
return concurrent
# Example: Version conflicts in distributed database
class VersionedDatabase:
def __init__(self, node_id, num_nodes):
self.clock = VectorClock(node_id, num_nodes)
self.data = {}
def write(self, key, value):
# Increment vector clock for write
timestamp = self.clock.increment()
version = {
'value': value,
'vector': timestamp.copy(),
'node': self.clock.node_id
}
if key not in self.data:
self.data[key] = []
# Check for conflicts with existing versions
conflicts = []
for existing in self.data[key]:
comp = self.clock.compare(timestamp, existing['vector'])
if comp == 0: # Concurrent
conflicts.append(existing)
if conflicts:
# Handle concurrent writes (application-specific)
version = self._resolve_conflict(version, conflicts)
self.data[key].append(version)
return version
def read(self, key):
# Return all non-obsolete versions
if key not in self.data:
return []
# Filter out versions that are definitely older
current_versions = []
for version in self.data[key]:
# Check if this version is not dominated by any other
is_current = True
for other in self.data[key]:
if (version != other and
self.clock.compare(version['vector'], other['vector']) == -1):
is_current = False
break
if is_current:
current_versions.append(version)
return current_versions
def _resolve_conflict(self, new_version, conflicts):
# Example conflict resolution: last writer wins with tie-breaker
# In practice, use application-specific logic
all_versions = conflicts + [new_version]
# Sort by vector clock (partial order)
all_versions.sort(key=lambda v: tuple(v['vector']), reverse=True)
return all_versions[0]
Vector Clock Properties:
- Causality Detection: Precisely identifies concurrent events
- Space Complexity: O(N) where N is number of nodes
- Merge Operation: Element-wise maximum (commutative, associative)
- Garbage Collection: Challenge in long-running systems
๐ฐ๏ธ 2. Hybrid Logical Clocks: Best of Both Worlds
Hybrid Logical Clocks (HLC) combine physical timestamps with logical counters to provide monotonicity and bounded skew.
import time
import struct
from typing import Tuple
class HybridLogicalClock:
"""
HLC = (physical_time, logical_counter, node_id)
Properties:
1. Monotonic: Always increases
2. Bounded skew: |HLC - physical_time| <= MAX_OFFSET
3. Causal ordering: If event A โ B, then HLC(A) < HLC(B)
"""
def __init__(self, node_id: int, max_offset: int = 10):
self.node_id = node_id
self.max_offset = max_offset # Maximum allowed clock skew (ms)
# Current HLC state
self.pt = 0 # Physical time component (milliseconds)
self.l = 0 # Logical counter
def now(self) -> Tuple[int, int, int]:
"""Get current HLC timestamp"""
current_pt = self._get_physical_time()
if current_pt > self.pt:
# Physical time has advanced
self.pt = current_pt
self.l = 0
else:
# Same physical time, increment logical counter
self.l += 1
return (self.pt, self.l, self.node_id)
def send(self) -> Tuple[int, int, int]:
"""HLC for sending a message"""
return self.now()
def receive(self, received_hlc: Tuple[int, int, int]) -> Tuple[int, int, int]:
"""Update HLC based on received timestamp"""
received_pt, received_l, received_node = received_hlc
current_pt = self._get_physical_time()
# Update rule for HLC
if current_pt > max(self.pt, received_pt):
# Our physical time is ahead of both
self.pt = current_pt
self.l = 0
elif received_pt > self.pt:
# Received timestamp is ahead
self.pt = received_pt
self.l = received_l + 1
elif received_pt == self.pt:
# Same physical time
self.l = max(self.l, received_l) + 1
else:
# Our clock is ahead of received
self.l += 1
# Bound check
if abs(self.pt - current_pt) > self.max_offset:
raise ClockSkewError(f"Clock skew {abs(self.pt - current_pt)} > {self.max_offset}")
return (self.pt, self.l, self.node_id)
def _get_physical_time(self) -> int:
"""Get current physical time in milliseconds"""
return int(time.time() * 1000)
def compare(self, hlc1: Tuple[int, int, int], hlc2: Tuple[int, int, int]) -> int:
"""Compare two HLC timestamps"""
pt1, l1, n1 = hlc1
pt2, l2, n2 = hlc2
if pt1 < pt2:
return -1
elif pt1 > pt2:
return 1
else:
# Same physical time
if l1 < l2:
return -1
elif l1 > l2:
return 1
else:
# Same logical counter
if n1 < n2:
return -1
elif n1 > n2:
return 1
else:
return 0 # Equal
def to_bytes(self, hlc: Tuple[int, int, int]) -> bytes:
"""Serialize HLC to bytes for storage/transmission"""
pt, l, node = hlc
return struct.pack('!QII', pt, l, node)
def from_bytes(self, data: bytes) -> Tuple[int, int, int]:
"""Deserialize HLC from bytes"""
return struct.unpack('!QII', data)
class ClockSkewError(Exception):
pass
# Example: Distributed transaction logging
class DistributedTransactionLog:
def __init__(self, node_id):
self.hlc = HybridLogicalClock(node_id)
self.log = []
def log_transaction(self, transaction):
timestamp = self.hlc.now()
entry = {
'transaction': transaction,
'hlc': timestamp,
'physical_time': time.time()
}
self.log.append(entry)
# Sort log by HLC (causal order)
self.log.sort(key=lambda x: x['hlc'])
return timestamp
def replicate_log(self, remote_entries):
"""Merge logs from another node"""
for entry in remote_entries:
# Update HLC with received timestamp
self.hlc.receive(entry['hlc'])
# Add to log if not already present
if not any(self.hlc.compare(e['hlc'], entry['hlc']) == 0
for e in self.log):
self.log.append(entry)
# Resort log
self.log.sort(key=lambda x: x['hlc'])
def get_ordered_transactions(self):
"""Get transactions in causal order"""
return [entry['transaction'] for entry in self.log]
HLC Advantages:
- Monotonic: Always increases
- Causal Ordering: Preserves happens-before
- Bounded Skew: Stays close to physical time
- Compact: Fixed size (12 bytes typical)
- No Synchronization: No need for clock synchronization protocols
โ๏ธ 3. TrueTime: Google Spanner's Atomic Precision
TrueTime is Google's solution for globally consistent timestamps using atomic clocks and GPS.
import time
import random
from dataclasses import dataclass
from typing import Tuple
@dataclass
class TrueTimeInterval:
"""
TrueTime represents time as an interval [earliest, latest]
with guaranteed error bounds
"""
earliest: float # Lower bound (seconds since epoch)
latest: float # Upper bound
def midpoint(self) -> float:
return (self.earliest + self.latest) / 2
def error(self) -> float:
return self.latest - self.earliest
def contains(self, timestamp: float) -> bool:
return self.earliest <= timestamp <= self.latest
def definitely_before(self, other: 'TrueTimeInterval') -> bool:
return self.latest < other.earliest
def definitely_after(self, other: 'TrueTimeInterval') -> bool:
return self.earliest > other.latest
def concurrent(self, other: 'TrueTimeInterval') -> bool:
return not (self.definitely_before(other) or
self.definitely_after(other))
class TrueTimeAPI:
"""
Simulated TrueTime API as used in Google Spanner
Real implementation uses:
- Atomic clocks (cesium, rubidium)
- GPS receivers
- Time master servers
"""
def __init__(self, epsilon: float = 0.001):
"""
epsilon: maximum clock uncertainty (1ms default)
In production: 1-7ms for Spanner
"""
self.epsilon = epsilon
self.reference_time = time.time()
# Simulate clock drift
self.drift_rate = random.uniform(-0.0001, 0.0001) # ยฑ100ppm
def now(self) -> TrueTimeInterval:
"""
Returns: [earliest, latest] with |latest - earliest| <= 2ฮต
"""
current = self._get_current_time()
error = self._get_current_error()
earliest = current - error
latest = current + error
return TrueTimeInterval(earliest, latest)
def _get_current_time(self) -> float:
"""Get current time with simulated drift"""
elapsed = time.time() - self.reference_time
drift = elapsed * self.drift_rate
return time.time() + drift
def _get_current_error(self) -> float:
"""
Calculate current error bound
Components:
1. Clock uncertainty (ฮต)
2. Message delay uncertainty
3. Master clock synchronization error
"""
base_error = self.epsilon
# Simulate additional error sources
message_delay_error = random.uniform(0, 0.0005) # Up to 0.5ms
sync_error = random.uniform(0, 0.0002) # Up to 0.2ms
return base_error + message_delay_error + sync_error
def after(self, timestamp: float) -> TrueTimeInterval:
"""
Returns a timestamp that is guaranteed to be after the given time
Used for commit waits in Spanner
"""
now_interval = self.now()
if timestamp < now_interval.latest:
# Already past the timestamp
return self.now()
else:
# Need to wait
wait_time = timestamp - now_interval.midpoint()
if wait_time > 0:
time.sleep(wait_time)
return self.now()
def sleep(self, duration: float) -> TrueTimeInterval:
"""Sleep for at least duration, return timestamp after sleep"""
time.sleep(duration + 2 * self.epsilon) # Wait extra for safety
return self.now()
# Example: Spanner-style distributed transactions
class SpannerTransaction:
"""
Two-phase commit with TrueTime for global ordering
Key idea: Use TrueTime to assign commit timestamps
that respect causal order across datacenters
"""
def __init__(self, truetime: TrueTimeAPI):
self.truetime = truetime
self.participants = []
self.start_time = self.truetime.now()
def prepare(self) -> bool:
"""Phase 1: Prepare all participants"""
prepared = []
for participant in self.participants:
try:
prepared.append(participant.prepare())
except Exception as e:
# Abort on any failure
self._abort()
return False
return all(prepared)
def commit(self) -> float:
"""Phase 2: Commit with TrueTime timestamp"""
if not self.prepare():
raise TransactionError("Prepare failed")
# Get commit timestamp with uncertainty
commit_interval = self.truetime.now()
# Ensure this commit is definitely after start
while not commit_interval.definitely_after(self.start_time):
commit_interval = self.truetime.after(self.start_time.latest)
# Commit timestamp is the midpoint
commit_timestamp = commit_interval.midpoint()
# Wait until commit timestamp is definitely in the past
self.truetime.after(commit_timestamp)
# Now safe to commit at all participants
for participant in self.participants:
participant.commit(commit_timestamp)
return commit_timestamp
def _abort(self):
"""Rollback all participants"""
for participant in self.participants:
participant.rollback()
class TransactionError(Exception):
pass
# Example: Global schema change with TrueTime
class GlobalSchemaChange:
"""
Using TrueTime to coordinate schema changes across datacenters
"""
def __init__(self, truetime: TrueTimeAPI):
self.truetime = truetime
self.change_time = None
def schedule_change(self, change_id: str, future_time: float):
"""
Schedule schema change for a future time
All datacenters will apply change at same logical time
"""
# Get current time interval
now = self.truetime.now()
# Ensure future_time is definitely in the future
if future_time <= now.latest:
future_time = now.latest + 2 * self.truetime.epsilon
self.change_time = future_time
return future_time
def should_apply_change(self, current_time: float) -> bool:
"""
Check if it's safe to apply the schema change
Returns True when current_time >= change_time
"""
if self.change_time is None:
return False
# Get current time with uncertainty
now_interval = self.truetime.now()
# Apply change only when we're definitely past the change time
return now_interval.definitely_after(
TrueTimeInterval(self.change_time, self.change_time)
)
def apply_change_safely(self, change_func):
"""
Wait until safe, then apply change
"""
if self.change_time is None:
return
# Wait until we're definitely past the change time
self.truetime.after(self.change_time)
# Now safe to apply change globally
change_func()
TrueTime Innovations:
- Explicit Uncertainty: Time represented as interval, not point
-
Wait-free Coordination:
after()method for synchronization - Global Ordering: Enables globally consistent timestamps
- Hardware Redundancy: Multiple atomic clocks and GPS receivers
- Software Monitoring: Continuous error bound verification
๐ณ 4. Interval Tree Clocks: Dynamic Scalability
Interval Tree Clocks (ITC) solve the scaling problem of vector clocks for highly dynamic systems.
from typing import List, Tuple, Optional
import math
class ITCStamp:
"""
Interval Tree Clock stamp
Represents causal knowledge as a binary tree
Properties:
1. Dynamic: O(log n) size for n events
2. Fork/Join: Efficient for dynamic process creation
3. Monotonic: Always increases
4. Partial Order: Captures happens-before
"""
def __init__(self):
# Binary tree representation
self.tree = self._create_leaf(1) # Start with single unit
def _create_leaf(self, value: int):
return ('leaf', value)
def _create_node(self, left, right):
return ('node', left, right)
def event(self) -> 'ITCStamp':
"""Generate new event (increment stamp)"""
new_stamp = self._clone()
new_stamp._increment()
return new_stamp
def _increment(self):
"""Internal increment operation"""
if self.tree[0] == 'leaf':
value = self.tree[1]
self.tree = self._create_leaf(value + 1)
else:
# node case
_, left, right = self.tree
# Try to increment left, if full try right
left_stamp = ITCStamp()
left_stamp.tree = left
right_stamp = ITCStamp()
right_stamp.tree = right
if not left_stamp._is_full():
left_stamp._increment()
self.tree = self._create_node(left_stamp.tree, right)
else:
right_stamp._increment()
self.tree = self._create_node(left, right_stamp.tree)
def _is_full(self) -> bool:
"""Check if stamp is at maximum capacity"""
if self.tree[0] == 'leaf':
return self.tree[1] >= self._max_leaf_value()
else:
_, left, right = self.tree
left_stamp = ITCStamp()
left_stamp.tree = left
right_stamp = ITCStamp()
right_stamp.tree = right
return left_stamp._is_full() and right_stamp._is_full()
def _max_leaf_value(self) -> int:
"""Maximum value for a leaf node"""
return 2 # For binary trees
def fork(self) -> Tuple['ITCStamp', 'ITCStamp']:
"""
Fork into two independent stamps
Used when creating new processes
"""
if self.tree[0] == 'leaf':
value = self.tree[1]
if value >= 2:
# Split leaf into two nodes
left = self._create_leaf(1)
right = self._create_leaf(value - 1)
self.tree = self._create_node(left, right)
# Now tree is a node, split it
_, left, right = self.tree
left_stamp = ITCStamp()
left_stamp.tree = left
right_stamp = ITCStamp()
right_stamp.tree = right
return left_stamp, right_stamp
def join(self, other: 'ITCStamp') -> 'ITCStamp':
"""
Join two stamps (merge knowledge)
Used when processes synchronize
"""
if self.tree[0] == 'leaf' and other.tree[0] == 'leaf':
value1 = self.tree[1]
value2 = other.tree[1]
return ITCStamp._create_leaf(value1 + value2)
elif self.tree[0] == 'node' and other.tree[0] == 'node':
_, left1, right1 = self.tree
_, left2, right2 = other.tree
left_stamp1 = ITCStamp()
left_stamp1.tree = left1
left_stamp2 = ITCStamp()
left_stamp2.tree = left2
right_stamp1 = ITCStamp()
right_stamp1.tree = right1
right_stamp2 = ITCStamp()
right_stamp2.tree = right2
joined_left = left_stamp1.join(left_stamp2)
joined_right = right_stamp1.join(right_stamp2)
result = ITCStamp()
result.tree = result._create_node(joined_left.tree, joined_right.tree)
return result
else:
# Mismatched types, normalize
normalized_self = self._normalize()
normalized_other = other._normalize()
return normalized_self.join(normalized_other)
def _normalize(self) -> 'ITCStamp':
"""Convert to normalized form"""
# Implementation of normalization algorithm
pass
def leq(self, other: 'ITCStamp') -> bool:
"""Check if self โค other in causal order"""
if self.tree[0] == 'leaf' and other.tree[0] == 'leaf':
return self.tree[1] <= other.tree[1]
elif self.tree[0] == 'node' and other.tree[0] == 'node':
_, left1, right1 = self.tree
_, left2, right2 = other.tree
left_stamp1 = ITCStamp()
left_stamp1.tree = left1
left_stamp2 = ITCStamp()
left_stamp2.tree = left2
right_stamp1 = ITCStamp()
right_stamp1.tree = right1
right_stamp2 = ITCStamp()
right_stamp2.tree = right2
return (left_stamp1.leq(left_stamp2) and
right_stamp1.leq(right_stamp2))
else:
# Mismatched, normalize first
return self._normalize().leq(other._normalize())
def concurrent(self, other: 'ITCStamp') -> bool:
"""Check if stamps are concurrent"""
return not (self.leq(other) or other.leq(self))
def _clone(self) -> 'ITCStamp':
"""Create a deep copy"""
import copy
new_stamp = ITCStamp()
new_stamp.tree = copy.deepcopy(self.tree)
return new_stamp
def encode(self) -> bytes:
"""Encode stamp to bytes for transmission"""
# Compact binary encoding
pass
@classmethod
def decode(cls, data: bytes) -> 'ITCStamp':
"""Decode stamp from bytes"""
pass
# Example: Dynamic microservices with ITC
class MicroserviceOrchestrator:
"""
Using ITC for causal tracking in dynamic microservices
Challenge: Services scale up/down dynamically
Vector clocks would require O(n) space
ITC provides O(log n) scaling
"""
def __init__(self):
self.global_stamp = ITCStamp()
self.services = {} # service_id -> ITCStamp
self.event_log = []
def create_service(self, service_id: str):
"""Create new service instance"""
if service_id in self.services:
raise ValueError(f"Service {service_id} already exists")
# Fork global knowledge to create new service stamp
service_stamp, new_global = self.global_stamp.fork()
self.services[service_id] = service_stamp
self.global_stamp = new_global
return service_stamp
def destroy_service(self, service_id: str):
"""Remove service instance"""
if service_id not in self.services:
return
# Join service knowledge back to global
service_stamp = self.services[service_id]
self.global_stamp = self.global_stamp.join(service_stamp)
del self.services[service_id]
def service_event(self, service_id: str, event_data: dict) -> ITCStamp:
"""Record event from service"""
if service_id not in self.services:
raise ValueError(f"Service {service_id} not found")
service_stamp = self.services[service_id]
# Generate new event stamp
new_stamp = service_stamp.event()
self.services[service_id] = new_stamp
# Log event with causal information
log_entry = {
'service': service_id,
'data': event_data,
'stamp': new_stamp,
'time': time.time()
}
self.event_log.append(log_entry)
return new_stamp
def synchronize_services(self, service_a: str, service_b: str):
"""Synchronize two services (e.g., RPC call)"""
stamp_a = self.services[service_a]
stamp_b = self.services[service_b]
# Exchange and merge knowledge
merged = stamp_a.join(stamp_b)
# Update both services
self.services[service_a] = merged
self.services[service_b] = merged
def get_causal_history(self, service_id: str) -> List[dict]:
"""Get all events that causally precede service's current state"""
service_stamp = self.services.get(service_id)
if not service_stamp:
return []
history = []
for entry in self.event_log:
if entry['stamp'].leq(service_stamp):
history.append(entry)
return history
def detect_concurrent_events(self, event1: dict, event2: dict) -> bool:
"""Check if two events are concurrent"""
return event1['stamp'].concurrent(event2['stamp'])
ITC Advantages for Dynamic Systems:
- O(log n) Space: Scales better than vector clocks
- Dynamic Process Creation: Efficient fork/join operations
- Garbage Collection: Built-in through tree normalization
- Compact Encoding: Efficient serialization
- Partial Order Maintenance: Preserves causal relationships
โ๏ธ 5. Byzantine Fault Tolerance: Beyond Crash Faults
Byzantine faults include arbitrary, malicious behaviorโnot just crashes.
import hashlib
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
@dataclass
class ByzantineMessage:
sender: int
sequence: int
payload: Any
signature: bytes
def verify(self, public_keys: Dict[int, bytes]) -> bool:
"""Verify message signature"""
if self.sender not in public_keys:
return False
public_key = serialization.load_pem_public_key(
public_keys[self.sender]
)
message_data = json.dumps({
'sender': self.sender,
'sequence': self.sequence,
'payload': self.payload
}).encode()
try:
public_key.verify(
self.signature,
message_data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except:
return False
def sign(self, private_key: rsa.RSAPrivateKey) -> 'ByzantineMessage':
"""Sign message with private key"""
message_data = json.dumps({
'sender': self.sender,
'sequence': self.sequence,
'payload': self.payload
}).encode()
signature = private_key.sign(
message_data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
self.signature = signature
return self
class PracticalByzantineFaultTolerance:
"""
PBFT: Practical Byzantine Fault Tolerance
Tolerates f faulty nodes with 3f+1 total nodes
"""
def __init__(self, node_id: int, total_nodes: int):
self.node_id = node_id
self.total_nodes = total_nodes
self.faulty_limit = (total_nodes - 1) // 3
# State
self.view = 0 # Current view number
self.sequence = 0 # Sequence number for operations
self.log = [] # Committed operations
# Cryptographic keys
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key_pem = self.private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Known public keys (in real system, from PKI)
self.public_keys = {}
def broadcast_prepare(self, operation: Any) -> ByzantineMessage:
"""Phase 1: Primary broadcasts PRE-PREPARE"""
self.sequence += 1
message = ByzantineMessage(
sender=self.node_id,
sequence=self.sequence,
payload={
'type': 'PRE-PREPARE',
'view': self.view,
'operation': operation,
'digest': self._hash_operation(operation)
},
signature=b''
)
return message.sign(self.private_key)
def receive_pre_prepare(self, message: ByzantineMessage) -> Optional[ByzantineMessage]:
"""Phase 2: Replicas send PREPARE"""
if not message.verify(self.public_keys):
return None
payload = message.payload
if payload['type'] != 'PRE-PREPARE':
return None
# Verify view and sequence
if payload['view'] != self.view:
return None
# Verify operation digest
if payload['digest'] != self._hash_operation(payload['operation']):
return None
# Send PREPARE message
prepare_message = ByzantineMessage(
sender=self.node_id,
sequence=payload['sequence'],
payload={
'type': 'PREPARE',
'view': self.view,
'digest': payload['digest']
},
signature=b''
)
return prepare_message.sign(self.private_key)
def collect_prepares(self, prepares: List[ByzantineMessage]) -> bool:
"""Phase 3: Check if we have 2f matching PREPARE messages"""
if len(prepares) < 2 * self.faulty_limit:
return False
# Group by digest
digests = {}
for prepare in prepares:
if prepare.verify(self.public_keys):
digest = prepare.payload['digest']
digests[digest] = digests.get(digest, 0) + 1
# Check if any digest has 2f+1 votes
for digest, count in digests.items():
if count >= 2 * self.faulty_limit + 1:
return True
return False
def send_commit(self, digest: str) -> ByzantineMessage:
"""Phase 4: Send COMMIT after collecting enough PREPAREs"""
commit_message = ByzantineMessage(
sender=self.node_id,
sequence=self.sequence,
payload={
'type': 'COMMIT',
'view': self.view,
'digest': digest
},
signature=b''
)
return commit_message.sign(self.private_key)
def collect_commits(self, commits: List[ByzantineMessage]) -> Optional[Any]:
"""Phase 5: Execute after collecting 2f+1 COMMITs"""
if len(commits) < 2 * self.faulty_limit + 1:
return None
# Verify all commits are for same digest
digests = set()
valid_commits = []
for commit in commits:
if commit.verify(self.public_keys):
digests.add(commit.payload['digest'])
valid_commits.append(commit)
if len(digests) != 1 or len(valid_commits) < 2 * self.faulty_limit + 1:
return None
# Find the operation with this digest
digest = list(digests)[0]
operation = self._find_operation_by_digest(digest)
if operation:
# Execute operation
self.log.append({
'sequence': self.sequence,
'operation': operation,
'digest': digest,
'view': self.view
})
return operation
return None
def view_change(self):
"""Handle view change when primary is suspected faulty"""
self.view += 1
# New primary is (view mod total_nodes)
new_primary = self.view % self.total_nodes
# Collect state from other nodes
# Send VIEW-CHANGE messages
# Elect new primary
# Catch up on missed operations
def _hash_operation(self, operation: Any) -> str:
"""Create cryptographic hash of operation"""
operation_str = json.dumps(operation, sort_keys=True).encode()
return hashlib.sha256(operation_str).hexdigest()
def _find_operation_by_digest(self, digest: str) -> Optional[Any]:
"""Find operation by its digest"""
# In real implementation, maintain mapping
pass
# Example: Byzantine-tolerant blockchain
class ByzantineBlockchain:
"""
Blockchain with Byzantine fault tolerance
Each block requires PBFT consensus
"""
def __init__(self, node_id: int, total_nodes: int):
self.pbft = PracticalByzantineFaultTolerance(node_id, total_nodes)
self.chain = []
self.pending_transactions = []
def propose_block(self, transactions: List[Any]) -> Optional[Dict]:
"""Propose new block to the network"""
if self._is_primary():
block = {
'index': len(self.chain),
'previous_hash': self._last_block_hash(),
'transactions': transactions,
'timestamp': time.time(),
'proposer': self.pbft.node_id
}
# Start PBFT consensus
pre_prepare = self.pbft.broadcast_prepare({
'type': 'BLOCK_PROPOSAL',
'block': block
})
return block
return None
def handle_block_proposal(self, message: ByzantineMessage) -> bool:
"""Process block proposal from other node"""
if not message.verify(self.pbft.public_keys):
return False
# Participate in PBFT consensus
prepare = self.pbft.receive_pre_prepare(message)
if not prepare:
return False
# Collect prepares from other nodes
# When we have 2f+1 matching prepares, send commit
# When we have 2f+1 matching commits, execute block
return True
def commit_block(self, block: Dict):
"""Add block to chain after consensus"""
# Verify block structure
if not self._validate_block(block):
return
# Verify consensus proof
# Add to chain
self.chain.append(block)
# Remove transactions from pending
self.pending_transactions = [
tx for tx in self.pending_transactions
if tx not in block['transactions']
]
def _is_primary(self) -> bool:
"""Check if this node is primary for current view"""
return self.pbft.node_id == (self.pbft.view % self.pbft.total_nodes)
def _last_block_hash(self) -> str:
"""Get hash of last block in chain"""
if not self.chain:
return '0' * 64
last_block = json.dumps(self.chain[-1], sort_keys=True).encode()
return hashlib.sha256(last_block).hexdigest()
def _validate_block(self, block: Dict) -> bool:
"""Validate block structure and transactions"""
# Implementation depends on application
return True
Byzantine Fault Models:
- Crash Faults: Nodes stop responding
- Omission Faults: Nodes drop messages
- Timing Faults: Nodes respond too slowly/quickly
- Byzantine Faults: Nodes behave arbitrarily (malicious)
PBFT Key Properties:
- Safety: All non-faulty nodes execute same operations in same order
- Liveness: Eventually execute operations despite faults
- Efficiency: 3-phase protocol with O(nยฒ) messages
- Optimistic: Normal case requires 1 round trip
๐ 6. Paxos Variants: The Consensus Family Tree
Paxos has spawned many variants optimized for different use cases.
from enum import Enum
from typing import List, Dict, Optional, Tuple
import random
class PaxosMessageType(Enum):
PREPARE = "prepare"
PROMISE = "promise"
ACCEPT = "accept"
ACCEPTED = "accepted"
NACK = "nack"
class PaxosVariant:
"""Base class for Paxos variants"""
def __init__(self, node_id: int, quorum_size: int):
self.node_id = node_id
self.quorum_size = quorum_size
# State
self.proposal_number = (0, node_id) # (round, node_id)
self.accepted_value = None
self.accepted_proposal = None
def generate_proposal_number(self) -> Tuple[int, int]:
"""Generate unique proposal number"""
current_round, _ = self.proposal_number
return (current_round + 1, self.node_id)
def is_quorum(self, responses: List) -> bool:
"""Check if we have quorum of responses"""
return len(responses) >= self.quorum_size
class ClassicPaxos(PaxosVariant):
"""Original Multi-Paxos algorithm"""
def __init__(self, node_id: int, total_nodes: int):
super().__init__(node_id, total_nodes // 2 + 1)
self.learned_values = {}
self.instances = {} # Consensus instance โ value
def prepare_phase(self, instance_id: int) -> Dict:
"""Phase 1a: Send PREPARE"""
self.proposal_number = self.generate_proposal_number()
return {
'type': PaxosMessageType.PREPARE,
'instance': instance_id,
'proposal': self.proposal_number,
'from': self.node_id
}
def handle_promise(self, instance_id: int, promises: List[Dict]) -> Optional[Dict]:
"""Phase 2: Process PROMISEs, send ACCEPT if quorum"""
if not self.is_quorum(promises):
return None
# Find highest accepted value from promises
highest_accepted = None
highest_proposal = (0, 0)
for promise in promises:
if promise.get('accepted_proposal', (0, 0)) > highest_proposal:
highest_proposal = promise['accepted_proposal']
highest_accepted = promise.get('accepted_value')
# Use highest accepted value or our own
value = highest_accepted if highest_accepted else self._choose_value(instance_id)
return {
'type': PaxosMessageType.ACCEPT,
'instance': instance_id,
'proposal': self.proposal_number,
'value': value,
'from': self.node_id
}
def handle_accepted(self, instance_id: int, accepteds: List[Dict]):
"""Learn value if we have quorum of ACCEPTED"""
if self.is_quorum(accepteds):
# All ACCEPTED should have same value
values = set(msg['value'] for msg in accepteds)
if len(values) == 1:
learned_value = values.pop()
self.learned_values[instance_id] = learned_value
self.instances[instance_id] = learned_value
def _choose_value(self, instance_id: int):
"""Choose value to propose"""
# Application-specific logic
return f"value-for-{instance_id}"
class FastPaxos(PaxosVariant):
"""
Fast Paxos: Optimistic fast path
Uses O(4f+1) nodes instead of O(3f+1)
"""
def __init__(self, node_id: int, total_nodes: int):
# Fast Paxos requires larger quorums
super().__init__(node_id, (3 * total_nodes) // 4 + 1)
self.fast_quorum = (total_nodes * 3) // 4
self.classic_quorum = total_nodes // 2 + 1
def fast_round(self, instance_id: int, value) -> Dict:
"""Fast path: Send value directly to acceptors"""
return {
'type': PaxosMessageType.ACCEPT,
'instance': instance_id,
'proposal': ('fast', self.node_id), # Special fast round
'value': value,
'from': self.node_id
}
def handle_fast_accept(self, instance_id: int, messages: List[Dict]) -> Optional[Dict]:
"""Check if fast path succeeded"""
# Need fast quorum for fast path
if len(messages) >= self.fast_quorum:
# Check for conflict
values = set(msg['value'] for msg in messages)
if len(values) == 1:
# Fast path succeeded
return {
'type': PaxosMessageType.ACCEPTED,
'instance': instance_id,
'value': values.pop()
}
# Fall back to classic Paxos
return None
class ByzantinePaxos(PaxosVariant):
"""Paxos variant that tolerates Byzantine faults"""
def __init__(self, node_id: int, total_nodes: int):
# Byzantine Paxos requires 3f+1 nodes
self.f = (total_nodes - 1) // 3
super().__init__(node_id, 2 * self.f + 1)
# Cryptographic signatures
self.signatures = {}
def byzantine_prepare(self, instance_id: int) -> Dict:
"""Phase 1 with signatures"""
proposal = self.generate_proposal_number()
signature = self._sign_proposal(instance_id, proposal)
return {
'type': PaxosMessageType.PREPARE,
'instance': instance_id,
'proposal': proposal,
'signature': signature,
'from': self.node_id
}
def verify_byzantine_promise(self, promise: Dict) -> bool:
"""Verify promise signature"""
if 'signature' not in promise:
return False
# Verify signature matches sender and content
# Implementation depends on crypto library
return True
def byzantine_accept(self, instance_id: int, value) -> Dict:
"""Phase 2 with signatures"""
signature = self._sign_value(instance_id, self.proposal_number, value)
return {
'type': PaxosMessageType.ACCEPT,
'instance': instance_id,
'proposal': self.proposal_number,
'value': value,
'signature': signature,
'from': self.node_id
}
def _sign_proposal(self, instance_id: int, proposal: Tuple[int, int]) -> bytes:
"""Sign proposal number"""
data = f"{instance_id}:{proposal[0]}:{proposal[1]}".encode()
# Real implementation would use actual signatures
return hashlib.sha256(data).digest()
def _sign_value(self, instance_id: int, proposal: Tuple[int, int], value) -> bytes:
"""Sign value"""
data = f"{instance_id}:{proposal[0]}:{proposal[1]}:{value}".encode()
return hashlib.sha256(data).digest()
# Example: Paxos-based configuration management
class PaxosConfigurationStore:
"""Configuration store using Paxos for consensus"""
def __init__(self, node_id: int, nodes: List[int]):
self.node_id = node_id
self.nodes = nodes
self.paxos = ClassicPaxos(node_id, len(nodes))
# Configuration state
self.config = {}
self.config_history = []
self.config_version = 0
def propose_config_change(self, key: str, value) -> bool:
"""Propose configuration change"""
# Create operation
operation = {
'type': 'SET_CONFIG',
'key': key,
'value': value,
'version': self.config_version + 1,
'proposer': self.node_id
}
# Run Paxos for this version
instance_id = self.config_version
# Phase 1: Prepare
prepare_msg = self.paxos.prepare_phase(instance_id)
self._broadcast(prepare_msg)
# Wait for promises
# In real implementation, use async/await or callbacks
# Phase 2: Accept
# Phase 3: Learn
# If consensus reached, apply change
if instance_id in self.paxos.learned_values:
learned = self.paxos.learned_values[instance_id]
self._apply_config_operation(learned)
return True
return False
def _apply_config_operation(self, operation: Dict):
"""Apply configuration operation"""
if operation['type'] == 'SET_CONFIG':
self.config[operation['key']] = operation['value']
self.config_version = operation['version']
self.config_history.append({
'version': operation['version'],
'operation': operation,
'timestamp': time.time()
})
def get_config(self, key: str, version: Optional[int] = None):
"""Get configuration value"""
if version:
# Get historical value
for entry in reversed(self.config_history):
if entry['version'] <= version:
if entry['operation']['key'] == key:
return entry['operation']['value']
return None
else:
# Get current value
return self.config.get(key)
def _broadcast(self, message: Dict):
"""Broadcast message to all nodes"""
# Implementation depends on network layer
pass
Paxos Variants Comparison:
| Variant | Nodes Required | Message Complexity | Use Case |
|---|---|---|---|
| Classic Paxos | 2f+1 | O(nยฒ) | General consensus |
| Fast Paxos | 4f+1 | O(n) optimistic | Low latency when no conflicts |
| Byzantine Paxos | 3f+1 | O(nยฒ) with crypto | Adversarial environments |
| Cheap Paxos | f+1 (with reconfiguration) | O(nยฒ) | Resource-constrained |
| Egalitarian Paxos | 2f+1 | O(n) | Leaderless, symmetric |
๐ฃ 7. Raft Extensions: Production Optimizations
Raft's simplicity has led to many production extensions.
from typing import List, Dict, Optional, Set
import time
import threading
from collections import defaultdict
class RaftWithLease:
"""
Raft with leader leases for linearizable reads
Leader maintains lease to serve reads without quorum
"""
def __init__(self, node_id: int, election_timeout: float = 1.0):
self.node_id = node_id
self.election_timeout = election_timeout
# Raft state
self.state = 'follower'
self.current_term = 0
self.voted_for = None
self.log = []
# Lease state
self.lease_expiry = 0
self.lease_duration = election_timeout / 2
# For lease validation
self.last_heartbeat = defaultdict(float)
def become_leader(self):
"""Transition to leader state"""
self.state = 'leader'
self.lease_expiry = time.time() + self.lease_duration
# Send initial heartbeat to establish lease
self._broadcast_heartbeat()
def renew_lease(self):
"""Renew leader lease"""
if self.state != 'leader':
return False
current_time = time.time()
# Check if majority acknowledge our leadership
acknowledgments = 0
for node, last_seen in self.last_heartbeat.items():
if current_time - last_seen < self.election_timeout:
acknowledgments += 1
if acknowledgments >= len(self.last_heartbeat) // 2 + 1:
self.lease_expiry = current_time + self.lease_duration
return True
return False
def has_valid_lease(self) -> bool:
"""Check if leader lease is still valid"""
if self.state != 'leader':
return False
return time.time() < self.lease_expiry
def serve_linearizable_read(self, key: str):
"""Serve read with linearizability guarantee"""
if not self.has_valid_lease():
# Lease expired, need to go through log
return self._serve_consistent_read(key)
# Lease valid, can serve directly
return self._read_local(key)
def _serve_consistent_read(self, key: str):
"""Read through log for consistency"""
# Ensure we're still leader
if not self._ensure_leadership():
raise NotLeaderError("Lost leadership")
# Read local
return self._read_local(key)
def _ensure_leadership(self) -> bool:
"""Ensure we're still leader, renew lease if needed"""
if self.state != 'leader':
return False
if not self.has_valid_lease():
return self.renew_lease()
return True
def _read_local(self, key: str):
"""Read from local state machine"""
# Implementation depends on state machine
pass
def _broadcast_heartbeat(self):
"""Send heartbeat to all followers"""
# Implementation depends on network
pass
class NotLeaderError(Exception):
pass
class RaftWithWitness:
"""
Raft with witness nodes for reduced storage
Witnesses participate in consensus but don't store log
"""
def __init__(self, node_id: int, is_witness: bool = False):
self.node_id = node_id
self.is_witness = is_witness
# Regular Raft state
self.current_term = 0
self.voted_for = None
# Witnesses only store metadata
if is_witness:
self.log = [] # Empty for witnesses
self.commit_index = 0
self.last_applied = 0
else:
# Full replica stores complete log
self.log = []
self.commit_index = 0
self.last_applied = 0
# Track which nodes are witnesses
self.witnesses = set()
def can_vote(self) -> bool:
"""Check if node can vote in elections"""
# Witnesses can vote
return True
def can_commit(self) -> bool:
"""Check if node can commit entries"""
# Only full replicas can commit
return not self.is_witness
def receive_entries(self, entries: List[Dict]) -> bool:
"""Receive log entries from leader"""
if self.is_witness:
# Witnesses don't store entries
# Just acknowledge receipt
return True
else:
# Full replicas append to log
self.log.extend(entries)
return True
def get_quorum_size(self, total_nodes: int, witness_count: int) -> int:
"""
Calculate quorum size with witnesses
With witnesses, quorum must include:
1. Majority of all nodes (including witnesses)
2. At least one full replica
"""
total = total_nodes + witness_count
return (total // 2) + 1
def is_quorum(self, responses: Set[int],
full_replicas: Set[int],
witnesses: Set[int]) -> bool:
"""Check if responses constitute a quorum"""
total_nodes = len(full_replicas) + len(witnesses)
quorum_size = self.get_quorum_size(len(full_replicas), len(witnesses))
if len(responses) < quorum_size:
return False
# Check that we have at least one full replica
if not any(node in full_replicas for node in responses):
return False
return True
class RaftWithPreVote:
"""
Raft with pre-vote to prevent disrupted leaders
from causing unnecessary elections
"""
def __init__(self, node_id: int):
self.node_id = node_id
self.state = 'follower'
self.current_term = 0
# Pre-vote state
self.pre_vote_term = 0
self.pre_votes_received = set()
# Track connectivity
self.last_contact = defaultdict(float)
def start_pre_vote(self):
"""Start pre-vote phase before actual election"""
if self.state != 'candidate':
return
self.pre_vote_term = self.current_term + 1
self.pre_votes_received = {self.node_id}
# Request pre-votes from other nodes
self._request_pre_votes()
def _request_pre_votes(self):
"""Send pre-vote requests to all nodes"""
message = {
'type': 'PRE_VOTE_REQUEST',
'term': self.pre_vote_term,
'candidate_id': self.node_id,
'last_log_index': len(self.log) - 1 if self.log else -1,
'last_log_term': self.log[-1]['term'] if self.log else 0
}
self._broadcast(message)
def handle_pre_vote_request(self, request: Dict) -> Optional[Dict]:
"""Handle incoming pre-vote request"""
# Check if candidate's log is up-to-date
if not self._is_candidate_log_up_to_date(request):
return {
'type': 'PRE_VOTE_RESPONSE',
'term': self.current_term,
'vote_granted': False,
'from': self.node_id
}
# Check if we've recently heard from leader
current_time = time.time()
time_since_last_contact = current_time - max(self.last_contact.values(), default=0)
# Only grant pre-vote if we haven't heard from leader recently
if time_since_last_contact < self.election_timeout:
return {
'type': 'PRE_VOTE_RESPONSE',
'term': self.current_term,
'vote_granted': False,
'from': self.node_id
}
# Grant pre-vote
return {
'type': 'PRE_VOTE_RESPONSE',
'term': self.current_term,
'vote_granted': True,
'from': self.node_id
}
def collect_pre_votes(self, responses: List[Dict]) -> bool:
"""Check if we have majority pre-votes"""
granted = sum(1 for r in responses if r['vote_granted'])
total_nodes = self._get_total_nodes()
if granted >= total_nodes // 2 + 1:
# Start real election
return self._start_real_election()
return False
def _start_real_election(self) -> bool:
"""Start real election after successful pre-vote"""
self.current_term = self.pre_vote_term
self.state = 'candidate'
# Request actual votes
return self._request_votes()
def _is_candidate_log_up_to_date(self, request: Dict) -> bool:
"""Check if candidate's log is at least as up-to-date as ours"""
our_last_index = len(self.log) - 1 if self.log else -1
our_last_term = self.log[-1]['term'] if self.log else 0
cand_last_index = request['last_log_index']
cand_last_term = request['last_log_term']
if cand_last_term > our_last_term:
return True
elif cand_last_term == our_last_term and cand_last_index >= our_last_index:
return True
return False
# Example: Raft-based key-value store with extensions
class ProductionRaftKVStore:
"""Production-ready key-value store using Raft with extensions"""
def __init__(self, node_id: int, nodes: List[int]):
self.node_id = node_id
self.nodes = nodes
# Use Raft with lease for fast reads
self.raft = RaftWithLease(node_id)
# Key-value state machine
self.kv_store = {}
self.kv_versions = {} # key -> [(version, value)]
# Thread safety
self.lock = threading.RLock()
def put(self, key: str, value, require_linearizability: bool = True) -> bool:
"""Put key-value pair with optional linearizability"""
with self.lock:
# Create log entry
entry = {
'type': 'PUT',
'key': key,
'value': value,
'term': self.raft.current_term,
'timestamp': time.time()
}
# Replicate through Raft
success = self._replicate_entry(entry)
if success:
# Apply to state machine
self._apply_put(key, value)
if require_linearizability:
# Wait for commitment
return self._wait_for_commit(entry)
return True
return False
def get(self, key: str, linearizable: bool = True):
"""Get value with optional linearizability"""
if linearizable:
# Use leader lease for linearizable read
return self.raft.serve_linearizable_read(key)
else:
# Eventually consistent read
with self.lock:
return self.kv_store.get(key)
def _apply_put(self, key: str, value):
"""Apply PUT operation to state machine"""
if key not in self.kv_versions:
self.kv_versions[key] = []
version = len(self.kv_versions[key]) + 1
self.kv_versions[key].append((version, value))
self.kv_store[key] = value
def _replicate_entry(self, entry: Dict) -> bool:
"""Replicate log entry through Raft"""
# Implementation depends on Raft library
# In production, use libraries like etcd's raft or hashicorp/raft
pass
def _wait_for_commit(self, entry: Dict) -> bool:
"""Wait for entry to be committed"""
# Implementation depends on commit notification
pass
Raft Extensions Summary:
| Extension | Purpose | Use Case |
|---|---|---|
| Leader Lease | Fast linearizable reads | Read-heavy workloads |
| Witness Nodes | Reduce storage requirements | Large clusters with storage constraints |
| Pre-Vote | Prevent election storms | Unstable networks |
| Joint Consensus | Membership changes | Rolling updates, scaling |
| Log Compaction | Control log growth | Long-running systems |
| Read Index | Leader-less consistent reads | Multi-datacenter |
๐ฏ 8. Virtual Synchrony Model: Group Communication
Virtual synchrony provides a consistent view of group membership and message ordering.
from typing import List, Dict, Set, Optional
from enum import Enum
import time
from collections import defaultdict
class VSMessageType(Enum):
JOIN = "join"
LEAVE = "leave"
MULTICAST = "multicast"
ACK = "ack"
VIEW_CHANGE = "view_change"
class VirtualSynchrony:
"""
Virtual Synchrony: Consistent group membership and message ordering
Properties:
1. Virtual synchrony: All members see events in same order
2. View synchrony: All members see membership changes in same order
3. Safe delivery: Messages delivered only in views where sender present
"""
def __init__(self, node_id: int):
self.node_id = node_id
# Group state
self.current_view = [] # List of members in current view
self.view_id = 0
# Message queues
self.pending_messages = [] # Messages waiting for view agreement
self.delivered_messages = [] # Messages already delivered
# Causal ordering
self.vector_clock = {}
# Failure detection
self.suspected = set()
self.heartbeats = defaultdict(float)
def join_group(self, members: List[int]) -> bool:
"""Join or form a new group"""
if self.node_id in members and self.node_id not in self.current_view:
# Start view change protocol
self._initiate_view_change(members)
return True
return False
def leave_group(self):
"""Leave the group gracefully"""
if self.node_id in self.current_view:
# Send leave notification
leave_msg = {
'type': VSMessageType.LEAVE,
'sender': self.node_id,
'view_id': self.view_id,
'timestamp': time.time()
}
self._multicast(leave_msg)
# Remove from local view
self.current_view.remove(self.node_id)
def multicast(self, message: Dict, total_order: bool = True) -> bool:
"""
Multicast message to group
total_order: If True, use total ordering (atomic broadcast)
If False, use causal ordering
"""
if self.node_id not in self.current_view:
return False
# Create message with metadata
msg = {
'type': VSMessageType.MULTICAST,
'sender': self.node_id,
'view_id': self.view_id,
'message_id': self._generate_message_id(),
'payload': message,
'timestamp': time.time(),
'total_order': total_order
}
if total_order:
# For total ordering, use consensus protocol
return self._total_order_multicast(msg)
else:
# For causal ordering, use vector clocks
return self._causal_multicast(msg)
def _total_order_multicast(self, message: Dict) -> bool:
"""Atomic broadcast with total ordering"""
# Phase 1: Propose sequence number
proposal = {
'type': 'PROPOSE',
'message': message,
'proposer': self.node_id,
'view': self.view_id
}
# Send to all members
responses = self._send_to_all(proposal)
# Phase 2: Agree on sequence (consensus)
if self._is_quorum(responses):
# Assign sequence number
sequence = self._assign_sequence(message)
# Phase 3: Deliver in order
self._deliver_in_order(message, sequence)
return True
return False
def _causal_multicast(self, message: Dict) -> bool:
"""Multicast with causal ordering"""
# Update vector clock
if self.node_id not in self.vector_clock:
self.vector_clock[self.node_id] = 0
self.vector_clock[self.node_id] += 1
# Attach vector clock to message
message['vector_clock'] = self.vector_clock.copy()
# Broadcast message
self._broadcast(message)
# Buffer for causal delivery
self.pending_messages.append(message)
self._deliver_causally()
return True
def _deliver_causally(self):
"""Deliver messages in causal order"""
# Sort pending messages by vector clock
deliverable = []
for msg in self.pending_messages:
if self._causally_ready(msg):
deliverable.append(msg)
# Sort by timestamp (or other deterministic order)
deliverable.sort(key=lambda x: (x['timestamp'], x['sender']))
for msg in deliverable:
self._deliver_message(msg)
self.pending_messages.remove(msg)
def _causally_ready(self, message: Dict) -> bool:
"""Check if message is causally ready for delivery"""
if 'vector_clock' not in message:
return True
msg_vc = message['vector_clock']
# Check if all causal dependencies are satisfied
for node, counter in msg_vc.items():
if self.vector_clock.get(node, 0) < counter:
# Missing causal dependency
return False
return True
def _deliver_message(self, message: Dict):
"""Deliver message to application"""
# Update vector clock
if 'vector_clock' in message:
for node, counter in message['vector_clock'].items():
self.vector_clock[node] = max(self.vector_clock.get(node, 0), counter)
# Increment own counter for delivery event
self.vector_clock[self.node_id] = self.vector_clock.get(self.node_id, 0) + 1
# Add to delivered messages
self.delivered_messages.append(message)
# Notify application
self._application_deliver(message['payload'])
def _initiate_view_change(self, new_members: List[int]):
"""Initiate view change protocol"""
# Phase 1: Propose new view
view_proposal = {
'type': VSMessageType.VIEW_CHANGE,
'proposed_view': new_members,
'view_id': self.view_id + 1,
'proposer': self.node_id
}
# Send to all proposed members
responses = []
for member in new_members:
if member != self.node_id:
# In real system, send over network
pass
# Phase 2: Wait for acknowledgments
if len(responses) >= len(new_members) // 2 + 1:
# Phase 3: Install new view
self._install_view(new_members, self.view_id + 1)
def _install_view(self, members: List[int], view_id: int):
"""Install new view"""
old_view = set(self.current_view)
new_view = set(members)
# Members that left
left = old_view - new_view
# Members that joined
joined = new_view - old_view
# Update state
self.current_view = members.copy()
self.view_id = view_id
# Handle state transfer for new members
for new_member in joined:
self._transfer_state_to(new_member)
# Clean up state for departed members
for departed in left:
self._cleanup_state_for(departed)
def _transfer_state_to(self, new_member: int):
"""Transfer state to new group member"""
# Send current view
# Send undelivered messages
# Send vector clock state
pass
def _application_deliver(self, message):
"""Callback for application to process delivered message"""
# Override in subclass
pass
def _generate_message_id(self) -> str:
"""Generate unique message ID"""
return f"{self.node_id}:{time.time():.6f}:{hash(str(self.delivered_messages))}"
def _is_quorum(self, responses: List) -> bool:
"""Check if we have quorum of responses"""
return len(responses) >= len(self.current_view) // 2 + 1
def _send_to_all(self, message: Dict) -> List[Dict]:
"""Send message to all group members"""
# Implementation depends on network
return []
def _broadcast(self, message: Dict):
"""Broadcast message to group"""
# Implementation depends on network
pass
# Example: Virtual synchrony for distributed gaming
class DistributedGameServer:
"""Multiplayer game server using virtual synchrony"""
def __init__(self, server_id: int):
self.vs = VirtualSynchrony(server_id)
self.game_state = {}
self.players = {}
# Game-specific
self.game_tick = 0
self.pending_actions = []
def player_join(self, player_id: int, player_data: Dict):
"""Handle player joining game"""
# Update local state
self.players[player_id] = player_data
# Multicast join event
join_msg = {
'action': 'PLAYER_JOIN',
'player_id': player_id,
'player_data': player_data,
'tick': self.game_tick
}
self.vs.multicast(join_msg, total_order=True)
def player_action(self, player_id: int, action: Dict):
"""Handle player action"""
# Validate action
if not self._validate_action(player_id, action):
return False
# Create action message
action_msg = {
'action': 'PLAYER_ACTION',
'player_id': player_id,
'action': action,
'tick': self.game_tick,
'timestamp': time.time()
}
# Use causal ordering for game actions
return self.vs.multicast(action_msg, total_order=False)
def game_tick_update(self):
"""Process game tick"""
self.game_tick += 1
# Collect all actions for this tick
tick_actions = [a for a in self.pending_actions
if a.get('tick') == self.game_tick - 1]
# Apply actions in deterministic order
tick_actions.sort(key=lambda x: (x['timestamp'], x['player_id']))
for action in tick_actions:
self._apply_game_action(action)
# Broadcast game state update
state_update = {
'action': 'GAME_STATE_UPDATE',
'tick': self.game_tick,
'game_state': self.game_state,
'players': self.players
}
self.vs.multicast(state_update, total_order=True)
def _application_deliver(self, message: Dict):
"""Process delivered messages"""
action = message.get('action')
if action == 'PLAYER_JOIN':
self._handle_player_join(message)
elif action == 'PLAYER_ACTION':
self._handle_player_action(message)
elif action == 'GAME_STATE_UPDATE':
self._handle_state_update(message)
elif action == 'PLAYER_LEAVE':
self._handle_player_leave(message)
def _handle_player_join(self, message: Dict):
"""Handle player join event"""
player_id = message['player_id']
player_data = message['player_data']
# Update local state
self.players[player_id] = player_data
# Initialize player in game state
self.game_state[player_id] = {
'position': (0, 0, 0),
'health': 100,
'score': 0
}
def _handle_player_action(self, message: Dict):
"""Handle player action event"""
# Add to pending actions for next tick
self.pending_actions.append(message)
def _apply_game_action(self, action: Dict):
"""Apply game action to state"""
player_id = action['player_id']
action_data = action['action']
# Update game state based on action
# This is game-specific logic
pass
def _handle_state_update(self, message: Dict):
"""Handle game state update"""
# Update local state
self.game_state = message['game_state']
self.players = message['players']
self.game_tick = message['tick']
def _validate_action(self, player_id: int, action: Dict) -> bool:
"""Validate player action"""
if player_id not in self.players:
return False
# Game-specific validation
return True
Virtual Synchrony Guarantees:
- View Synchrony: All members see same sequence of views
- Message Ordering: Messages delivered in same order to all members
- Atomicity: All or nothing delivery within a view
- Virtual Synchrony: Messages from same view delivered in same relative order
๐ 9. Distributed Complexity Theory
Understanding the fundamental limits of distributed computation.
from typing import List, Set, Dict, Tuple
import math
from enum import Enum
class ComplexityClass(Enum):
"""Complexity classes for distributed problems"""
LOCAL = "O(1) rounds"
LOGARITHMIC = "O(log n) rounds"
POLYLOG = "O(polylog n) rounds"
POLYNOMIAL = "O(poly n) rounds"
GLOBAL = "ฮฉ(n) rounds"
class DistributedProblem:
"""Base class for distributed problems"""
def __init__(self, n: int):
self.n = n # Number of nodes
self.rounds = 0
self.messages_sent = 0
self.bits_sent = 0
def lower_bound_rounds(self) -> int:
"""Lower bound on number of communication rounds"""
raise NotImplementedError
def lower_bound_messages(self) -> int:
"""Lower bound on number of messages"""
raise NotImplementedError
def lower_bound_bits(self) -> int:
"""Lower bound on number of bits"""
raise NotImplementedError
def complexity_class(self) -> ComplexityClass:
"""Complexity class of the problem"""
raise NotImplementedError
class LeaderElection(DistributedProblem):
"""Leader election in anonymous ring"""
def __init__(self, n: int):
super().__init__(n)
def lower_bound_rounds(self) -> int:
"""
Lower bound: ฮฉ(n) rounds for anonymous ring
Even with unique IDs: ฮฉ(log n) rounds
"""
return self.n # For anonymous ring
def lower_bound_messages(self) -> int:
"""
Lower bound: ฮฉ(n log n) messages for comparison-based
With unique IDs: O(n) messages possible
"""
return self.n * math.ceil(math.log2(self.n))
def lower_bound_bits(self) -> int:
"""Each message needs ฮฉ(log n) bits for ID"""
return self.lower_bound_messages() * math.ceil(math.log2(self.n))
def complexity_class(self) -> ComplexityClass:
return ComplexityClass.GLOBAL
class ConsensusProblem(DistributedProblem):
"""Distributed consensus with crash faults"""
def __init__(self, n: int, f: int):
super().__init__(n)
self.f = f # Maximum faulty nodes
def lower_bound_rounds(self) -> int:
"""
FLP: No solution in asynchronous systems
Synchronous: f+1 rounds necessary and sufficient
"""
return self.f + 1
def lower_bound_messages(self) -> int:
"""
Lower bound: ฮฉ(nยฒ) messages for some solutions
Optimal: O(n) messages per round
"""
return (self.f + 1) * self.n
def lower_bound_bits(self) -> int:
"""Each message contains value + metadata"""
return self.lower_bound_messages() * 32 # Assuming 32-bit values
def complexity_class(self) -> ComplexityClass:
return ComplexityClass.POLYNOMIAL
class ColoringProblem(DistributedProblem):
"""Graph coloring in distributed setting"""
def __init__(self, n: int, delta: int):
super().__init__(n)
self.delta = delta # Maximum degree
def lower_bound_rounds(self) -> int:
"""
ฮ+1 coloring:
- Deterministic: ฮฉ(log* n) rounds
- Randomized: O(log log n) rounds expected
"""
return math.ceil(math.log2(self.delta))
def lower_bound_messages(self) -> int:
"""Each node communicates with neighbors each round"""
return self.n * self.delta * self.lower_bound_rounds()
def complexity_class(self) -> ComplexityClass:
return ComplexityClass.LOGARITHMIC
class MSTProblem(DistributedProblem):
"""Minimum Spanning Tree in distributed setting"""
def __init__(self, n: int):
super().__init__(n)
def lower_bound_rounds(self) -> int:
"""
Lower bound: ฮฉ(โ(n/log n) + D) rounds
where D is network diameter
Best known: O(โn log* n) rounds
"""
return math.ceil(math.sqrt(self.n / math.log(self.n)))
def lower_bound_messages(self) -> int:
"""ฮฉ(m) messages where m is number of edges"""
# Complete graph has m = n(n-1)/2 edges
return self.n * (self.n - 1) // 2
def complexity_class(self) -> ComplexityClass:
return ComplexityClass.POLYNOMIAL
class LowerBoundProver:
"""Prove lower bounds using information theory"""
@staticmethod
def fooling_set_method(problem_size: int, output_size: int) -> int:
"""
Fooling set method for communication complexity
Returns: Lower bound on bits that must be communicated
"""
# Size of fooling set gives lower bound
return math.ceil(math.log2(problem_size / output_size))
@staticmethod
def crossing_sequence_method(diameter: int, states: int) -> int:
"""
Crossing sequence method for round complexity
Returns: Lower bound on number of rounds
"""
# Number of different crossing sequences
sequences = states ** diameter
return math.ceil(math.log2(sequences))
@staticmethod
def indistinguishability_argument(configurations: int) -> int:
"""
Indistinguishability argument for impossibility proofs
Used in FLP impossibility proof
"""
return math.ceil(math.log2(configurations))
# Example: Proving lower bounds for specific problems
class LowerBoundExamples:
"""Concrete lower bound proofs"""
@staticmethod
def prove_consensus_lower_bound(n: int, f: int):
"""Proof sketch for consensus lower bounds"""
print(f"=== Consensus Lower Bounds (n={n}, f={f}) ===")
# Round lower bound: f+1
print(f"1. Round lower bound: {f+1} rounds")
print(" Proof: Need f+1 rounds to wait for all non-faulty nodes")
# Message lower bound
min_messages = (f + 1) * n
print(f"2. Message lower bound: ฮฉ({min_messages}) messages")
print(" Proof: Each round requires at least n messages")
# Impossibility in async
print("3. FLP Impossibility: No deterministic solution in async systems")
print(" Proof: Always possible to keep system in bivalent state")
@staticmethod
def prove_leader_election_lower_bound(n: int):
"""Proof sketch for leader election lower bounds"""
print(f"\n=== Leader Election Lower Bounds (n={n}) ===")
# For anonymous ring
print(f"1. Anonymous ring: ฮฉ({n}) rounds")
print(" Proof: Need to break symmetry, information travels O(1) per round")
# Message complexity
print(f"2. Message complexity: ฮฉ({n} log {n}) messages")
print(" Proof: Comparison-based algorithm needs pairwise comparisons")
# With unique IDs
print(f"3. With unique IDs: O({n}) messages possible")
print(" Proof: Can use ID propagation algorithms")
# Complexity hierarchy of distributed problems
distributed_hierarchy = {
ComplexityClass.LOCAL: [
"Maximal Independent Set (randomized)",
"Weak coloring",
"Network decomposition (fragmented)"
],
ComplexityClass.LOGARITHMIC: [
"MIS (deterministic)",
"ฮ+1 coloring",
"Maximal matching"
],
ComplexityClass.POLYLOG: [
"Minimum dominating set",
"Minimum vertex cover approximation",
"Network decomposition"
],
ComplexityClass.POLYNOMIAL: [
"Minimum spanning tree",
"Shortest paths",
"Max flow"
],
ComplexityClass.GLOBAL: [
"Consensus",
"Byzantine agreement",
"Leader election (anonymous)"
]
}
Key Complexity Results:
- LOCAL Model: Problems solvable in O(1) rounds
- CONGEST Model: Limited bandwidth per edge
- CONGESTED CLIQUE: All pairs can communicate directly
- Beeping Model: Extremely limited communication
โ๏ธ 10. Lower Bounds: The Minimum Cost of Coordination
Proving what's fundamentally impossible or expensive.
import numpy as np
from typing import List, Dict, Set
import math
class InformationTheoreticLowerBounds:
"""Lower bounds using information theory"""
@staticmethod
def communication_complexity(input_size: int, output_size: int) -> int:
"""
Lower bound on bits that must be communicated
Based on: Need to distinguish between all possible inputs
"""
# Number of possible inputs
possible_inputs = 2 ** input_size
# Number of possible outputs
possible_outputs = 2 ** output_size
# Minimum bits to distinguish inputs
return math.ceil(math.log2(possible_inputs / possible_outputs))
@staticmethod
def round_complexity(diameter: int, local_states: int) -> int:
"""
Lower bound on number of rounds
Based on: Information propagates at most one hop per round
"""
# Information needs to travel diameter distance
return diameter
@staticmethod
def message_complexity(nodes: int, edges: int) -> int:
"""
Lower bound on number of messages
Based on: Need to communicate across cuts in network
"""
# Minimum cut in network
return edges // 2
@staticmethod
def storage_complexity(state_size: int, replicas: int) -> int:
"""
Lower bound on storage overhead
Based on: Need to store enough information to recover from failures
"""
# For f faults, need f+1 replicas
return state_size * replicas
class ConsensusLowerBounds:
"""Specific lower bounds for consensus problems"""
@staticmethod
def flp_impossibility():
"""
Fischer-Lynch-Paterson Impossibility
No deterministic consensus protocol is possible in
asynchronous systems with one crash fault
"""
proof_steps = [
"1. Assume a protocol exists that always terminates",
"2. Construct an execution that keeps system in bivalent state",
"3. Show adversary can always delay messages to maintain bivalence",
"4. Therefore, protocol cannot always terminate โ contradiction"
]
return proof_steps
@staticmethod
def dls_impossibility(partial_synchrony: bool):
"""
Dwork-Lynch-Stockmeyer Impossibility
In partially synchronous systems:
- If clocks can drift unboundedly, consensus impossible
- With bounded drift, consensus possible with failure detectors
"""
if partial_synchrony:
return "Possible with failure detectors (e.g., โP)"
else:
return "Impossible with unbounded clock drift"
@staticmethod
def byzantine_lower_bound(nodes: int, faults: int) -> Dict:
"""
Lower bounds for Byzantine consensus
Requires: n > 3f for authentication
n > 2f for no authentication
"""
results = {
'minimum_nodes_with_auth': 3 * faults + 1,
'minimum_nodes_no_auth': 2 * faults + 1,
'round_lower_bound': faults + 1,
'message_complexity': 'ฮฉ(nยฒ)',
'cryptographic_assumptions': 'Needed for digital signatures'
}
return results
class DataStructureLowerBounds:
"""Lower bounds for distributed data structures"""
@staticmethod
def crdt_convergence(state_size: int, operations: int) -> int:
"""
Lower bound on metadata for CRDT convergence
Need to track enough information to merge any pair of states
"""
# For grow-only counter: O(n) where n = number of replicas
# For observed-remove set: O(m) where m = number of elements
return state_size * math.log2(operations)
@staticmethod
def consistent_hashing_load(distribution: str, nodes: int, items: int) -> float:
"""
Load balance lower bounds for consistent hashing
Returns: Minimum maximum load any node must handle
"""
if distribution == 'uniform':
# With k choices: O(log log n / log k) improvement
base_load = items / nodes
optimal_load = base_load * (1 + 1/math.log2(nodes))
return optimal_load
elif distribution == 'bounded':
# With bounded loads: O(1) maximum load
return math.ceil(items / nodes)
@staticmethod
def quorum_system_availability(quorum_size: int,
failure_prob: float,
nodes: int) -> float:
"""
Lower bound on availability of quorum systems
Trade-off between load and availability
"""
# Probability all quorums fail
prob_all_fail = failure_prob ** quorum_size
# Availability lower bound
availability = 1 - prob_all_fail
# Lower bound from probabilistic analysis
lower_bound = 1 - math.exp(-nodes * failure_prob)
return max(availability, lower_bound)
class NetworkLowerBounds:
"""Lower bounds for network protocols"""
@staticmethod
def tcp_throughput(rtt: float, loss_rate: float, mss: int = 1460) -> float:
"""
Mathis Equation: TCP throughput upper bound
throughput <= (MSS / RTT) * (1 / sqrt(p))
where p = loss rate
"""
if loss_rate == 0:
return float('inf')
throughput = (mss / rtt) * (1 / math.sqrt(loss_rate))
return throughput
@staticmethod
def bandwidth_delay_product(bandwidth: float, rtt: float) -> float:
"""
BDP = Bandwidth ร Round Trip Time
Minimum buffer size needed to fill the pipe
"""
return bandwidth * rtt
@staticmethod
def queueing_delay(arrival_rate: float,
service_rate: float,
cv_a: float = 1.0,
cv_s: float = 1.0) -> float:
"""
Kingman's Formula for G/G/1 queue
E[W] โ (ฯ / (1 - ฯ)) * ((cv_aยฒ + cv_sยฒ) / 2) * E[S]
where ฯ = ฮป/ฮผ, cv = coefficient of variation
"""
if arrival_rate >= service_rate:
return float('inf')
rho = arrival_rate / service_rate
service_time = 1 / service_rate
wait_time = (rho / (1 - rho)) * ((cv_a**2 + cv_s**2) / 2) * service_time
return wait_time
# Example: Proving specific lower bounds
class ConcreteLowerBoundProofs:
"""Concrete proofs of distributed systems lower bounds"""
@staticmethod
def proof_consensus_messages(n: int):
"""
Proof that consensus requires ฮฉ(nยฒ) messages
in worst case for some algorithms
"""
print("=== Proof: Consensus requires ฮฉ(nยฒ) messages ===")
print("\n1. Setup:")
print(f" - n = {n} nodes")
print(" - Each node has initial value 0 or 1")
print(" - Need all nodes to decide same value")
print("\n2. Adversarial strategy:")
print(" - Network is complete graph")
print(" - Adversary controls scheduling")
print(" - Can delay specific messages")
print("\n3. Information-theoretic argument:")
print(" - Each node initially knows only its own value")
print(" - To learn others' values, must receive messages")
print(f" - Need at least n(n-1)/2 messages in worst case")
print("\n4. Reduction:")
print(" - Reduce from set disjointness problem")
print(" - Known to require ฮฉ(n) bits for 2-party")
print(" - Generalizes to ฮฉ(nยฒ) for n-party")
print(f"\n5. Result: ฮฉ({n}ยฒ) messages required")
@staticmethod
def proof_leader_election_anonymous(n: int):
"""
Proof that leader election in anonymous ring
requires ฮฉ(n) rounds
"""
print(f"\n=== Proof: Leader election requires ฮฉ({n}) rounds in anonymous ring ===")
print("\n1. Symmetry argument:")
print(" - All nodes identical (anonymous)")
print(" - Same program, no unique IDs")
print(" - Initially symmetric situation")
print("\n2. Information propagation:")
print(" - Ring diameter = โn/2โ")
print(" - Information travels 1 hop per round")
print(f" - Need at least โ{n}/2โ rounds to break symmetry")
print("\n3. Indistinguishability:")
print(" - After k rounds, nodes at distance > k see same neighborhood")
print(" - Cannot distinguish themselves")
print(" - Need enough rounds for information to travel full diameter")
print(f"\n4. Result: ฮฉ({n}) rounds required")
@staticmethod
def proof_byzantine_agreement(f: int):
"""
Proof that Byzantine agreement requires n > 3f
"""
print(f"\n=== Proof: Byzantine agreement requires n > 3{f} ===")
print("\n1. Setup:")
print(f" - f = {f} Byzantine (malicious) nodes")
print(" - Rest are honest")
print(" - Need all honest nodes to agree")
print("\n2. Partition argument:")
print(" - Divide nodes into 3 groups of size f")
print(" - Byzantine nodes can lie to different groups")
print(" - If n โค 3f, Byzantine can prevent agreement")
print("\n3. Contradiction scenario:")
print(" - Group A sees value 0 from Byzantine")
print(" - Group B sees value 1 from Byzantine")
print(" - Group C (Byzantine) can be inconsistent")
print(" - Honest nodes in A and B cannot agree")
print(f"\n4. Result: Need n > 3{f} for Byzantine agreement")
Fundamental Lower Bounds Summary:
- FLP Impossibility: No async consensus with crash faults
- CAP Theorem: Can't have all three properties
- Consensus Lower Bound: f+1 rounds with f crash faults
- Byzantine Agreement: Need n > 3f nodes
- Leader Election: ฮฉ(n) rounds in anonymous networks
- Communication Complexity: ฮฉ(n log n) bits for sorting
๐ซ 11. Impossibility Results: What Can't Be Done
Some problems are fundamentally impossible in distributed systems.
from typing import Set, List, Dict
import itertools
class ImpossibilityResults:
"""Collection of impossibility results in distributed computing"""
@staticmethod
def flp_impossibility_detail():
"""
Detailed proof of FLP impossibility
Theorem: No deterministic consensus protocol can guarantee
termination in an asynchronous system with one crash fault
"""
proof = {
'assumptions': [
"Asynchronous message passing",
"Crash failures (stop failures)",
"Deterministic algorithms",
"No message loss (fair loss)",
"Reliable links"
],
'definitions': [
"Configuration: Global state of system",
"Step: Action by a single process",
"Schedule: Sequence of steps",
"Univalent: All extensions lead to same decision",
"Bivalent: Some extensions lead to 0, some to 1"
],
'proof_steps': [
"1. Show initial configuration is bivalent",
"2. Show we can keep system bivalent forever",
"3. Construct adversary that delays messages to maintain bivalence",
"4. Therefore, cannot guarantee termination"
],
'implications': [
"Need randomization for guaranteed termination",
"Or need partial synchrony assumptions",
"Or need failure detectors"
]
}
return proof
@staticmethod
def cap_impossibility_detail():
"""
CAP Theorem impossibility
Theorem: Can't achieve all three of:
- Consistency (all nodes see same data)
- Availability (every request gets response)
- Partition tolerance (system works despite network partitions)
"""
proof = {
'formal_statement': "In async network with partitions, "
"impossible to implement read/write register "
"that is both available and atomic",
'proof_sketch': [
"1. Assume partition separates nodes",
"2. Write happens on one side",
"3. Read happens on other side",
"4. To be available, must respond",
"5. To be consistent, must return latest write",
"6. Impossible without communication across partition",
"7. Contradiction"
],
'practical_implications': [
"Must choose AP or CP during partitions",
"Can have CA only if no partitions guaranteed",
"Real systems choose based on use case"
]
}
return proof
@staticmethod
def consensus_impossibility(models: List[str]) -> Dict[str, bool]:
"""
Check consensus impossibility under different models
"""
results = {}
for model in models:
if model == 'async_crash':
results[model] = 'impossible (FLP)'
elif model == 'async_byzantine':
results[model] = 'impossible'
elif model == 'partial_sync_crash':
results[model] = 'possible with failure detectors'
elif model == 'sync_crash':
results[model] = 'possible (f+1 rounds)'
elif model == 'sync_byzantine':
results[model] = f'possible (n > 3f)'
elif model == 'anonymous':
results[model] = 'impossible for some problems'
return results
@staticmethod
def two_generals_problem():
"""
Two Generals Problem (coordinated attack)
Theorem: Impossible to guarantee coordinated attack
over unreliable communication
"""
return {
'problem': "Two generals need to coordinate attack time",
'constraints': [
"Communication via messengers",
"Messengers can be captured",
"Need agreement on exact time",
"Attack fails without perfect coordination"
],
'impossibility': "No protocol guarantees agreement with probability 1",
'reason': "Need infinite message exchanges for certainty",
'solution': "Accept probabilistic agreement or use trusted third party"
}
@staticmethod
def byzantine_generals_problem(f: int):
"""
Byzantine Generals Problem
Theorem: Need n > 3f for Byzantine agreement
with oral messages (no signatures)
"""
return {
'problem': f"{f} traitorous generals among n total",
'requirements': [
"All loyal generals decide same plan",
"If commander loyal, all loyal generals follow his plan"
],
'impossibility': f"If n โค 3f, no solution exists",
'proof': "Partition generals into 3 groups, traitors can confuse",
'with_signatures': f"With signatures, need n > 2f"
}
@staticmethod
def deterministic_consensus_async():
"""
Strengthening of FLP
Theorem: No deterministic protocol solves consensus
in async systems even with:
- Only one crash fault
- Reliable links
- No message loss
"""
return {
'stronger_form': "Even weaker than termination impossible",
'results': [
"Cannot guarantee agreement always",
"Cannot guarantee validity always",
"Termination is the impossible property"
],
'workarounds': [
"Randomized algorithms",
"Failure detectors (โP)",
"Partial synchrony assumptions",
"Weaker consistency models"
]
}
class ImpossibilityDemonstrator:
"""Demonstrate impossibility through simulation"""
def __init__(self):
self.configurations = set()
self.schedules = []
def demonstrate_flp(self, processes: int):
"""Demonstrate FLP through state exploration"""
print(f"\n=== Demonstrating FLP for {processes} processes ===")
# Initial states
initial_states = self._generate_initial_states(processes)
print(f"Initial configurations: {len(initial_states)}")
# Explore reachable states
reachable = self._explore_reachable(initial_states)
# Check for bivalent configurations
bivalent = self._find_bivalent(reachable)
print(f"Bivalent configurations found: {len(bivalent) > 0}")
if bivalent:
print("\nFound bivalent configuration that can be kept bivalent:")
print(self._format_configuration(next(iter(bivalent))))
def _generate_initial_states(self, processes: int) -> Set:
"""Generate all possible initial states"""
# Each process has value 0 or 1
states = set()
for values in itertools.product([0, 1], repeat=processes):
states.add(tuple(values))
return states
def _explore_reachable(self, initial_states: Set) -> Set:
"""Explore reachable configurations"""
visited = set()
stack = list(initial_states)
while stack:
config = stack.pop()
if config in visited:
continue
visited.add(config)
# Generate next configurations
# Simulate process steps
for next_config in self._next_configurations(config):
if next_config not in visited:
stack.append(next_config)
return visited
def _next_configurations(self, config: tuple) -> List[tuple]:
"""Generate possible next configurations"""
next_configs = []
n = len(config)
# Simulate each process taking a step
for i in range(n):
# Process i crashes (stops)
crashed_config = config[:i] + ('X',) + config[i+1:]
next_configs.append(crashed_config)
# Process i takes step (changes value)
new_value = 1 - config[i] if config[i] in [0, 1] else config[i]
stepped_config = config[:i] + (new_value,) + config[i+1:]
next_configs.append(stepped_config)
return next_configs
def _find_bivalent(self, configurations: Set) -> Set:
"""Find bivalent configurations"""
bivalent = set()
for config in configurations:
reachable_decisions = self._reachable_decisions(config)
if 0 in reachable_decisions and 1 in reachable_decisions:
bivalent.add(config)
return bivalent
def _reachable_decisions(self, config: tuple) -> Set:
"""Find all decisions reachable from configuration"""
decisions = set()
# Simplified: decision is when all non-crashed agree
if all(v == 0 or v == 'X' for v in config):
decisions.add(0)
if all(v == 1 or v == 'X' for v in config):
decisions.add(1)
return decisions
def _format_configuration(self, config: tuple) -> str:
"""Format configuration for display"""
return f"[{', '.join(str(v) for v in config)}]"
# Example: Understanding impossibility in practice
class PracticalImpossibilities:
"""Real-world implications of impossibility results"""
@staticmethod
def database_implications():
"""How impossibility affects database design"""
implications = {
'ACID_transactions': [
"Isolation levels trade off consistency for performance",
"Serializable isolation expensive, often not used",
"Snapshot isolation common compromise"
],
'distributed_transactions': [
"Two-phase commit blocks on coordinator failure",
"Three-phase commit avoids blocking but more complex",
"Paxos/Raft used for consensus instead"
],
'replication': [
"Synchronous replication: consistent but slow",
"Asynchronous replication: fast but can lose data",
"Semi-synchronous: compromise"
]
}
return implications
@staticmethod
def system_design_choices():
"""Design choices forced by impossibility"""
choices = [
{
'problem': "Leader election",
'impossibility': "Anonymous networks require ฮฉ(n) time",
'solution': "Use unique IDs or random delays"
},
{
'problem': "Consensus",
'impossibility': "Async + crash faults = no termination guarantee",
'solution': "Use failure detectors or randomization"
},
{
'problem': "Byzantine fault tolerance",
'impossibility': "Need n > 3f for agreement",
'solution': "Use larger clusters or trusted components"
},
{
'problem': "Coordinated attack",
'impossibility': "Need infinite messages for certainty",
'solution': "Accept probabilistic guarantees"
}
]
return choices
@staticmethod
def working_around_impossibility():
"""Practical workarounds for theoretical impossibilities"""
workarounds = {
'flp': [
"Use randomized algorithms (e.g., Ben-Or's algorithm)",
"Use failure detectors (e.g., โP for Paxos)",
"Assume partial synchrony (most real systems do)",
"Accept occasional non-termination (with timeouts)"
],
'cap': [
"Choose AP or CP based on application needs",
"Use tunable consistency (e.g., Cassandra quorums)",
"Separate data into different consistency tiers",
"Use conflict resolution for AP systems"
],
'byzantine': [
"Use cryptographic signatures",
"Increase replication factor (n > 3f)",
"Use trusted hardware (SGX, TPM)",
"Economic incentives (blockchain-style)"
]
}
return workarounds
Key Impossibility Results:
- FLP: No async consensus with crash faults
- CAP: Can't have all three properties during partitions
- Two Generals: Perfect coordination over unreliable links impossible
- Byzantine Generals: Need n > 3f for oral messages
- Consensus Lower Bounds: f+1 rounds minimum, ฮฉ(nยฒ) messages
โก 12. Wait-Free vs Lock-Free Concurrency
Understanding non-blocking synchronization.
import threading
import time
from typing import Optional, Any
import atomic
from dataclasses import dataclass
import sys
class ConcurrencyPrimitive:
"""Base class for concurrency primitives"""
def __init__(self):
self.operations = 0
self.contention = 0
def benchmark(self, threads: int, operations: int) -> Dict:
"""Benchmark performance under contention"""
results = {
'throughput': 0,
'latency': 0,
'scalability': 0,
'fairness': 0
}
return results
class LockBasedStack:
"""Stack using locks (blocking)"""
def __init__(self):
self.stack = []
self.lock = threading.Lock()
def push(self, item: Any):
with self.lock:
self.stack.append(item)
def pop(self) -> Optional[Any]:
with self.lock:
if not self.stack:
return None
return self.stack.pop()
def size(self) -> int:
with self.lock:
return len(self.stack)
class LockFreeStack:
"""Lock-free stack using CAS (Compare-And-Swap)"""
@dataclass
class Node:
value: Any
next: Optional['LockFreeStack.Node'] = None
def __init__(self):
self.head = None
# Use atomic reference for head pointer
self._head = atomic.AtomicReference(self.head)
def push(self, item: Any):
new_node = self.Node(item)
while True:
current_head = self._head.get()
new_node.next = current_head
# Try to swap head pointer
if self._head.compare_and_set(current_head, new_node):
return
def pop(self) -> Optional[Any]:
while True:
current_head = self._head.get()
if current_head is None:
return None
next_node = current_head.next
# Try to swap head pointer
if self._head.compare_and_set(current_head, next_node):
return current_head.value
def size(self) -> int:
"""Non-atomic size (approximate)"""
count = 0
current = self._head.get()
while current:
count += 1
current = current.next
return count
class WaitFreeStack:
"""
Wait-free stack using helping mechanism
Wait-free: Every operation completes in bounded number of steps
regardless of contention
"""
@dataclass
class Operation:
op_type: str # 'push' or 'pop'
value: Any = None
completed: bool = False
result: Any = None
def __init__(self):
self.operations = [] # Shared array for operation descriptors
self.help_counter = atomic.AtomicInteger(0)
def push(self, item: Any) -> bool:
op = self.Operation('push', item)
self.operations.append(op)
# Help other operations while waiting
self._help_others()
# Complete own operation
return self._complete_operation(op)
def pop(self) -> Optional[Any]:
op = self.Operation('pop')
self.operations.append(op)
# Help other operations
self._help_others()
# Wait for completion (bounded steps)
return self._complete_operation(op)
def _help_others(self):
"""Help other threads complete their operations"""
for i in range(len(self.operations)):
op = self.operations[i]
if not op.completed:
self._execute_operation(op)
def _execute_operation(self, op: 'Operation'):
"""Execute an operation"""
if op.op_type == 'push':
# Actual push implementation
# Uses atomic operations
pass
elif op.op_type == 'pop':
# Actual pop implementation
pass
def _complete_operation(self, op: 'Operation'):
"""Wait for operation to complete (bounded steps)"""
steps = 0
max_steps = 100 # Bound for wait-free guarantee
while not op.completed and steps < max_steps:
self._help_others()
steps += 1
# Exponential backoff or yield
time.sleep(0.000001 * (2 ** steps))
return op.result
class ObstructionFreeStack:
"""
Obstruction-free stack (weakest non-blocking guarantee)
Obstruction-free: Guarantees progress only when no contention
"""
def __init__(self):
self.stack = []
self.version = atomic.AtomicInteger(0)
def push(self, item: Any) -> bool:
while True:
# Read current state
current_version = self.version.get()
current_stack = self.stack.copy()
# Modify local copy
new_stack = current_stack + [item]
# Try to commit
if self.version.compare_and_set(current_version, current_version + 1):
self.stack = new_stack
return True
# Contention detected, backoff
time.sleep(0.000001)
def pop(self) -> Optional[Any]:
while True:
current_version = self.version.get()
if not self.stack:
return None
current_stack = self.stack.copy()
item = current_stack[-1]
new_stack = current_stack[:-1]
if self.version.compare_and_set(current_version, current_version + 1):
self.stack = new_stack
return item
time.sleep(0.000001)
# Performance comparison
class ConcurrencyBenchmark:
"""Benchmark different concurrency approaches"""
def __init__(self):
self.results = {}
def run_benchmark(self, num_threads: int, ops_per_thread: int):
"""Run comprehensive benchmark"""
print(f"\n=== Concurrency Benchmark ({num_threads} threads, {ops_per_thread} ops each) ===")
# Test lock-based
print("\n1. Lock-based stack:")
lock_time = self._test_stack(LockBasedStack(), num_threads, ops_per_thread)
# Test lock-free
print("\n2. Lock-free stack:")
lockfree_time = self._test_stack(LockFreeStack(), num_threads, ops_per_thread)
# Test wait-free
print("\n3. Wait-free stack:")
waitfree_time = self._test_stack(WaitFreeStack(), num_threads, ops_per_thread)
# Test obstruction-free
print("\n4. Obstruction-free stack:")
obstruction_time = self._test_stack(ObstructionFreeStack(), num_threads, ops_per_thread)
# Print results
print(f"\n=== Results ===")
print(f"Lock-based: {lock_time:.3f}s")
print(f"Lock-free: {lockfree_time:.3f}s")
print(f"Wait-free: {waitfree_time:.3f}s")
print(f"Obstruction-free: {obstruction_time:.3f}s")
return {
'lock_based': lock_time,
'lock_free': lockfree_time,
'wait_free': waitfree_time,
'obstruction_free': obstruction_time
}
def _test_stack(self, stack, num_threads: int, ops_per_thread: int) -> float:
"""Test specific stack implementation"""
threads = []
start_time = time.time()
def worker(thread_id):
for i in range(ops_per_thread):
# Mix of pushes and pops
if i % 2 == 0:
stack.push(f"thread-{thread_id}-{i}")
else:
stack.pop()
# Create and start threads
for i in range(num_threads):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# Wait for completion
for t in threads:
t.join()
end_time = time.time()
return end_time - start_time
# Progress guarantee hierarchy
progress_hierarchy = {
'blocking': {
'level': 'No progress guarantee',
'examples': ['mutex', 'semaphore', 'condition variable'],
'properties': ['Can deadlock', 'Can livelock', 'Priority inversion possible'],
'use_cases': ['Simple synchronization', 'When fairness needed']
},
'obstruction_free': {
'level': 'Weakest non-blocking',
'examples': ['Some optimistic algorithms'],
'properties': ['Progress only with no contention', 'Can starve'],
'use_cases': ['Low contention scenarios', 'Best-effort systems']
},
'lock_free': {
'level': 'System-wide progress',
'examples': ['CAS-based structures', 'Atomic operations'],
'properties': ['Some thread always makes progress', 'Individual threads can starve'],
'use_cases': ['High contention', 'Real-time systems (soft)']
},
'wait_free': {
'level': 'Strongest non-blocking',
'examples': ['Helping algorithms', 'Hardware transactions'],
'properties': ['Every operation completes in bounded steps', 'No starvation'],
'use_cases': ['Hard real-time', 'Mission critical']
}
}
# Implementation techniques
class NonBlockingTechniques:
"""Techniques for implementing non-blocking algorithms"""
@staticmethod
def compare_and_swap():
"""CAS (Compare-And-Swap) primitive"""
return {
'operation': "atomic_compare_exchange_strong(ptr, expected, desired)",
'effect': "If *ptr == expected, set *ptr = desired, return true",
'hardware': "Available on all modern CPUs (CMPXCHG on x86)",
'limitations': "ABA problem, requires careful memory management"
}
@staticmethod
def load_linked_store_conditional():
"""LL/SC (Load-Linked/Store-Conditional)"""
return {
'operation': "Two instructions: LL loads, SC stores if no interference",
'advantage': "No ABA problem",
'hardware': "ARM, PowerPC, MIPS",
'limitations': "Can spuriously fail"
}
@staticmethod
def transactional_memory():
"""Hardware Transactional Memory"""
return {
'operation': "Atomic block of operations",
'hardware': "Intel TSX, IBM PowerTM",
'advantages': ["Simpler programming", "Composability"],
'limitations': ["Capacity constraints", "Conflict detection overhead"]
}
@staticmethod
def helping_mechanism():
"""Wait-free helping"""
return {
'technique': "Threads help each other complete operations",
'examples': ["Universal constructions", "Herlihy's wait-free consensus"],
'properties': ["Bounded steps", "High overhead"],
'use': "When wait-free guarantee is required"
}
# Example: Non-blocking hash table
class NonBlockingHashTable:
"""Lock-free hash table using CAS"""
def __init__(self, size: int = 16):
self.size = size
self.buckets = [atomic.AtomicReference(None) for _ in range(size)]
self.resize_lock = atomic.AtomicBoolean(False)
def _hash(self, key: str) -> int:
return hash(key) % self.size
def put(self, key: str, value: Any) -> bool:
index = self._hash(key)
while True:
# Read current bucket head
current_head = self.buckets[index].get()
# Check if key already exists
node = current_head
while node:
if node.key == key:
# Update existing key
new_node = self.Node(key, value, node.next)
if self.buckets[index].compare_and_set(current_head, new_node):
return True
else:
# CAS failed, retry
break
node = node.next
# Key doesn't exist, add new node
new_node = self.Node(key, value, current_head)
if self.buckets[index].compare_and_set(current_head, new_node):
return True
def get(self, key: str) -> Optional[Any]:
index = self._hash(key)
node = self.buckets[index].get()
while node:
if node.key == key:
return node.value
node = node.next
return None
def remove(self, key: str) -> bool:
index = self._hash(key)
while True:
current_head = self.buckets[index].get()
# Find node to remove
prev = None
curr = current_head
while curr:
if curr.key == key:
# Found it
if prev is None:
# Removing head
new_head = curr.next
else:
# Removing middle node
prev.next = curr.next
new_head = current_head
if self.buckets[index].compare_and_set(current_head, new_head):
return True
else:
# CAS failed, retry whole operation
break
prev = curr
curr = curr.next
# Key not found
return False
class Node:
def __init__(self, key: str, value: Any, next_node: Optional['NonBlockingHashTable.Node'] = None):
self.key = key
self.value = value
self.next = next_node
Concurrency Guarantees Hierarchy:
- Blocking: Mutexes, semaphores (can deadlock)
- Obstruction-Free: Progress only without contention
- Lock-Free: System makes progress (some thread succeeds)
- Wait-Free: Every thread completes in bounded steps
๐ 13. Queueing Theory: Modeling System Performance
Mathematical models for predicting system behavior under load.
import numpy as np
from scipy import stats, special
from typing import Tuple, List, Dict
import math
class QueueingModels:
"""Mathematical queueing models for performance analysis"""
@staticmethod
def mm1_queue(arrival_rate: float, service_rate: float) -> Dict:
"""
M/M/1 queue: Markovian arrivals, Markovian service, 1 server
ฮป = arrival rate (requests/second)
ฮผ = service rate (requests/second)
ฯ = ฮป/ฮผ = utilization (must be < 1 for stability)
"""
if arrival_rate >= service_rate:
raise ValueError("System unstable: ฮป >= ฮผ")
rho = arrival_rate / service_rate
results = {
'utilization': rho,
'expected_customers_system': rho / (1 - rho),
'expected_customers_queue': rho**2 / (1 - rho),
'expected_wait_system': 1 / (service_rate - arrival_rate),
'expected_wait_queue': arrival_rate / (service_rate * (service_rate - arrival_rate)),
'probability_system_empty': 1 - rho,
'probability_n_customers': lambda n: (1 - rho) * rho**n
}
return results
@staticmethod
def mg1_queue(arrival_rate: float,
service_rate: float,
service_variance: float) -> Dict:
"""
M/G/1 queue: Markovian arrivals, General service distribution
Uses PollaczekโKhinchine formula
"""
if arrival_rate >= service_rate:
raise ValueError("System unstable: ฮป >= ฮผ")
rho = arrival_rate / service_rate
service_time = 1 / service_rate
# PollaczekโKhinchine formula
w_q = (arrival_rate * (service_variance + service_time**2)) / (2 * (1 - rho))
w = w_q + service_time
l_q = arrival_rate * w_q
l = arrival_rate * w
results = {
'utilization': rho,
'expected_customers_queue': l_q,
'expected_customers_system': l,
'expected_wait_queue': w_q,
'expected_wait_system': w,
'service_time_variance': service_variance,
'squared_coefficient_variation': service_variance / (service_time**2)
}
return results
@staticmethod
def gg1_queue(arrival_rate: float,
service_rate: float,
cv_a: float, # Coefficient of variation for arrivals
cv_s: float) -> Dict: # Coefficient of variation for service
"""
G/G/1 queue: General arrivals, General service
Kingman's approximation formula
"""
if arrival_rate >= service_rate:
raise ValueError("System unstable: ฮป >= ฮผ")
rho = arrival_rate / service_rate
service_time = 1 / service_rate
# Kingman's approximation
w_q_approx = (rho / (1 - rho)) * ((cv_a**2 + cv_s**2) / 2) * service_time
w_approx = w_q_approx + service_time
l_q_approx = arrival_rate * w_q_approx
l_approx = arrival_rate * w_approx
results = {
'utilization': rho,
'approximate_wait_queue': w_q_approx,
'approximate_wait_system': w_approx,
'approximate_customers_queue': l_q_approx,
'approximate_customers_system': l_approx,
'arrival_cv': cv_a,
'service_cv': cv_s,
'note': 'Results are approximations for G/G/1'
}
return results
@staticmethod
def mmc_queue(arrival_rate: float, service_rate: float, servers: int) -> Dict:
"""
M/M/c queue: Multiple servers
c = number of identical servers
"""
if arrival_rate >= servers * service_rate:
raise ValueError(f"System unstable: ฮป >= {servers}ฮผ")
rho = arrival_rate / (servers * service_rate)
# Probability of 0 customers in system
sum_term = 0
for n in range(servers):
sum_term += (arrival_rate / service_rate)**n / math.factorial(n)
p0 = 1 / (sum_term +
(arrival_rate / service_rate)**servers /
(math.factorial(servers) * (1 - rho)))
# Probability all servers busy
c = servers
p_queue = ((arrival_rate / service_rate)**c /
(math.factorial(c) * (1 - rho))) * p0
# Expected number in queue
l_q = (rho * p_queue) / (1 - rho)
# Expected wait in queue
w_q = l_q / arrival_rate
# Expected wait in system
w = w_q + 1 / service_rate
# Expected number in system
l = arrival_rate * w
results = {
'utilization_per_server': rho,
'total_utilization': arrival_rate / service_rate,
'probability_all_servers_busy': p_queue,
'expected_customers_queue': l_q,
'expected_customers_system': l,
'expected_wait_queue': w_q,
'expected_wait_system': w,
'probability_system_empty': p0
}
return results
class NetworkOfQueues:
"""Models for networks of queues (Jackson networks)"""
@staticmethod
def jackson_network(service_rates: List[float],
routing_matrix: np.ndarray,
arrival_rates: List[float]) -> Dict:
"""
Jackson network: Open network of M/M/1 queues
routing_matrix[i][j] = probability going from i to j
routing_matrix[i][0] = probability leaving system from i
"""
n = len(service_rates)
# Solve traffic equations: ฮป_i = ฮณ_i + ฮฃ_j ฮป_j * p_ji
# Where ฮณ_i is external arrival rate to node i
# Set up equations: ฮป - ฮปP = ฮณ
P = routing_matrix
I = np.eye(n)
# Solve linear equations
A = I - P.T # Transpose because equations are different
lambda_vec = np.linalg.solve(A, arrival_rates)
# Check stability: ฮป_i < ฮผ_i for all i
stable = all(lambda_i < mu_i for lambda_i, mu_i in zip(lambda_vec, service_rates))
if not stable:
raise ValueError("Network unstable: Some nodes have ฮป >= ฮผ")
# Calculate performance for each node
node_performance = []
for i in range(n):
lambda_i = lambda_vec[i]
mu_i = service_rates[i]
rho_i = lambda_i / mu_i
node_result = {
'arrival_rate': lambda_i,
'service_rate': mu_i,
'utilization': rho_i,
'expected_customers': rho_i / (1 - rho_i),
'expected_wait': 1 / (mu_i - lambda_i)
}
node_performance.append(node_result)
# Overall network performance
total_arrival = sum(arrival_rates)
total_departure = sum(lambda_vec * (1 - np.sum(P, axis=1))) # Leaving system
results = {
'stable': stable,
'node_performance': node_performance,
'total_throughput': total_departure,
'total_arrival': total_arrival,
'utilization_vector': [p['utilization'] for p in node_performance],
'bottleneck_node': np.argmax([p['utilization'] for p in node_performance])
}
return results
@staticmethod
def mean_value_analysis(service_demands: List[float],
think_time: float,
users: int) -> Dict:
"""
Mean Value Analysis for closed queueing networks
Used for modeling systems with fixed number of users
(e.g., database connection pools)
"""
n = len(service_demands) # Number of devices (queues)
N = users # Number of users
# Initialize
response_times = [0] * n
throughputs = [0] * n
queue_lengths = [0] * n
# Iterative solution
for k in range(1, N + 1):
# Calculate response times
for i in range(n):
response_times[i] = service_demands[i] * (1 + queue_lengths[i])
# Calculate system response time
R = sum(response_times)
# Calculate throughput
X = k / (think_time + R)
# Calculate queue lengths
for i in range(n):
queue_lengths[i] = X * response_times[i]
results = {
'users': N,
'throughput': X,
'system_response_time': R,
'device_response_times': response_times,
'device_queue_lengths': queue_lengths,
'device_utilizations': [X * d for d in service_demands]
}
return results
class PerformanceMetrics:
"""Key performance metrics from queueing theory"""
@staticmethod
def little_law(arrival_rate: float,
response_time: float,
queue_length: float = None) -> Dict:
"""
Little's Law: L = ฮปW
L = average number in system
ฮป = arrival rate
W = average time in system
"""
if queue_length is None:
# Calculate L from ฮป and W
L = arrival_rate * response_time
return {'customers_in_system': L}
elif response_time is None:
# Calculate W from ฮป and L
W = queue_length / arrival_rate
return {'response_time': W}
else:
# Calculate ฮป from L and W
lambda_calc = queue_length / response_time
return {'arrival_rate': lambda_calc}
@staticmethod
def utilization_law(arrival_rate: float,
service_rate: float,
servers: int = 1) -> float:
"""
Utilization Law: ฯ = ฮป / (cฮผ)
ฯ = utilization
c = number of servers
"""
return arrival_rate / (servers * service_rate)
@staticmethod
def response_time_law(service_time: float,
utilization: float,
cv_a: float = 1.0,
cv_s: float = 1.0) -> float:
"""
Response time approximation
R = S / (1 - ฯ) for M/M/1
More generally: R โ S * (1 + (ฯ/(1-ฯ)) * ((cv_aยฒ + cv_sยฒ)/2))
"""
if utilization >= 1:
return float('inf')
# Kingman's approximation
return service_time * (1 + (utilization / (1 - utilization)) *
((cv_a**2 + cv_s**2) / 2))
@staticmethod
def throughput_bounds(service_rate: float,
servers: int,
think_time: float,
users: int) -> Tuple[float, float]:
"""
Asymptotic bounds on throughput
Lower bound: X โฅ N / (Z + ฮฃD_i)
Upper bound: X โค min(1/D_max, N/(Z + D_max))
Where D_i = service demand at device i
Z = think time
N = number of users
"""
# For single device case
D_max = 1 / service_rate # Maximum service demand
lower_bound = users / (think_time + D_max)
upper_bound = min(service_rate * servers,
users / (think_time + D_max))
return lower_bound, upper_bound
# Example: Modeling a web service
class WebServiceModel:
"""Queueing model for a web service"""
def __init__(self, config: Dict):
self.config = config
# Service components
self.frontend_rate = config.get('frontend_rate', 1000) # req/s
self.backend_rate = config.get('backend_rate', 500) # req/s
self.database_rate = config.get('database_rate', 200) # req/s
# Network delays
self.network_delay = config.get('network_delay', 0.010) # 10ms
# Routing probabilities
self.routing = config.get('routing', {
'frontend_to_backend': 0.8,
'frontend_to_database': 0.2,
'backend_to_database': 0.6,
'backend_to_frontend': 0.4
})
def model_performance(self, arrival_rate: float) -> Dict:
"""Model system performance under given load"""
# Frontend queue (M/M/1)
frontend = QueueingModels.mm1_queue(
arrival_rate=arrival_rate,
service_rate=self.frontend_rate
)
# Backend queue (M/G/1 with variable service times)
backend_arrival = arrival_rate * self.routing['frontend_to_backend']
backend = QueueingModels.mg1_queue(
arrival_rate=backend_arrival,
service_rate=self.backend_rate,
service_variance=0.001 # Example variance
)
# Database queue (M/M/c with connection pooling)
database_arrival = (arrival_rate * self.routing['frontend_to_database'] +
backend_arrival * self.routing['backend_to_database'])
database = QueueingModels.mmc_queue(
arrival_rate=database_arrival,
service_rate=self.database_rate,
servers=50 # Connection pool size
)
# Total response time
total_response = (
frontend['expected_wait_system'] +
backend['expected_wait_system'] +
database['expected_wait_system'] +
self.network_delay * 3 # Three network hops
)
# System throughput (limited by bottleneck)
bottleneck_utilization = max(
frontend['utilization'],
backend['utilization'],
database['utilization_per_server']
)
# Capacity planning
max_capacity = min(
self.frontend_rate * 0.8, # 80% utilization target
self.backend_rate * 0.8,
self.database_rate * 50 * 0.8 # 50 connections * 80%
)
results = {
'arrival_rate': arrival_rate,
'total_response_time': total_response,
'frontend_response': frontend['expected_wait_system'],
'backend_response': backend['expected_wait_system'],
'database_response': database['expected_wait_system'],
'bottleneck_utilization': bottleneck_utilization,
'max_recommended_capacity': max_capacity,
'predicted_p95_response': total_response * 2.0, # Approximation
'predicted_p99_response': total_response * 3.0 # Approximation
}
return results
def sensitivity_analysis(self,
arrival_rates: List[float]) -> List[Dict]:
"""Analyze performance across different load levels"""
results = []
for rate in arrival_rates:
try:
perf = self.model_performance(rate)
perf['stable'] = True
except ValueError as e:
# System unstable at this rate
perf = {
'arrival_rate': rate,
'stable': False,
'error': str(e)
}
results.append(perf)
return results
def find_saturation_point(self,
target_response: float = 1.0,
max_rate: float = 10000) -> float:
"""Find arrival rate where response time exceeds target"""
low = 0
high = max_rate
for _ in range(20): # Binary search
mid = (low + high) / 2
try:
perf = self.model_performance(mid)
if perf['total_response_time'] > target_response:
high = mid
else:
low = mid
except ValueError:
# Unstable at mid rate
high = mid
return (low + high) / 2
# Practical applications
class QueueingApplications:
"""Real applications of queueing theory"""
@staticmethod
def capacity_planning(current_load: float,
growth_rate: float,
sla_response: float,
model: WebServiceModel) -> Dict:
"""Plan capacity based on growth projections"""
# Project load over time
months = 12
monthly_loads = [current_load * (1 + growth_rate)**i
for i in range(months + 1)]
# Find when SLA will be violated
violation_month = None
for month, load in enumerate(monthly_loads):
try:
perf = model.model_performance(load)
if perf['total_response_time'] > sla_response:
violation_month = month
break
except ValueError:
violation_month = month
break
# Recommendation
if violation_month is None:
recommendation = f"Current capacity sufficient for {months} months"
else:
recommendation = (f"Need capacity upgrade before month "
f"{violation_month} (load: {monthly_loads[violation_month]:.0f} req/s)")
return {
'current_load': current_load,
'growth_rate': growth_rate,
'sla_response': sla_response,
'violation_month': violation_month,
'recommendation': recommendation,
'projected_loads': monthly_loads
}
@staticmethod
def autoscaling_policy(utilization_target: float = 0.7,
scale_out_threshold: float = 0.8,
scale_in_threshold: float = 0.4,
cooldown_period: int = 300) -> Dict:
"""Design autoscaling policy based on queueing theory"""
policy = {
'metrics': [
'CPU utilization (smoothed over 5 minutes)',
'Request queue length',
'Average response time',
'Error rate'
],
'scale_out_conditions': [
f'Utilization > {scale_out_threshold} for 3 consecutive minutes',
f'Queue length > 100 for 2 minutes',
f'P95 response time > SLA for 5 minutes'
],
'scale_in_conditions': [
f'Utilization < {scale_in_threshold} for 10 minutes',
f'Queue length < 10 for 5 minutes'
],
'cooldown_period': cooldown_period,
'max_instances': 'Based on cost budget',
'min_instances': 'Based on baseline load'
}
# Calculate optimal thresholds using queueing theory
# For M/M/c, want ฯ โ utilization_target
optimal_servers = lambda arrival_rate, service_rate: math.ceil(
arrival_rate / (service_rate * utilization_target)
)
policy['optimal_scaling'] = optimal_servers
return policy
@staticmethod
def sla_compliance(observed_response_times: List[float],
sla_p95: float,
sla_p99: float) -> Dict:
"""Check SLA compliance using queueing theory predictions"""
# Empirical percentiles
sorted_times = np.sort(observed_response_times)
n = len(sorted_times)
p95_empirical = sorted_times[int(0.95 * n)]
p99_empirical = sorted_times[int(0.99 * n)]
# Fit distribution (exponential for M/M/1, normal for heavy tail)
mean_response = np.mean(observed_response_times)
# For exponential distribution: p95 = mean * -ln(0.05) โ mean * 3
# p99 = mean * -ln(0.01) โ mean * 4.6
p95_predicted = mean_response * 3
p99_predicted = mean_response * 4.6
compliance = {
'observed_p95': p95_empirical,
'observed_p99': p99_empirical,
'predicted_p95': p95_predicted,
'predicted_p99': p99_predicted,
'sla_p95_violation': p95_empirical > sla_p95,
'sla_p99_violation': p99_empirical > sla_p99,
'distribution_fit': 'Exponential (M/M/1 approximation)',
'recommendation': 'Consider G/G/1 model if heavy tail observed'
}
return compliance
Queueing Theory Key Formulas:
- Little's Law: L = ฮปW
- Utilization Law: ฯ = ฮป/ฮผ
-
M/M/1 Formulas:
- L = ฯ/(1-ฯ)
- W = 1/(ฮผ-ฮป)
-
PollaczekโKhinchine (M/G/1):
- W_q = ฮป(ฯยฒ + 1/ฮผยฒ)/(2(1-ฯ))
-
Kingman's Approximation (G/G/1):
- W_q โ (ฯ/(1-ฯ)) * ((c_aยฒ + c_sยฒ)/2) * (1/ฮผ)
๐ฒ 14. Power of d Choices: Intelligent Load Balancing
Theoretical foundation for smarter load balancing algorithms.
import random
import numpy as np
from typing import List, Dict, Tuple
import math
from collections import Counter, deque
class PowerOfDChoices:
"""
The Power of d Choices load balancing
Instead of random assignment (d=1) or checking all (d=n),
check d random servers and pick least loaded
"""
def __init__(self, n_servers: int, d: int = 2):
self.n_servers = n_servers
self.d = d
self.loads = [0] * n_servers
self.queue_lengths = [0] * n_servers
# Statistics
self.requests_processed = 0
self.load_history = []
def dispatch_request(self, request_size: int = 1) -> int:
"""Dispatch request using power of d choices"""
# Sample d servers randomly
sampled_indices = random.sample(range(self.n_servers), self.d)
# Find least loaded among sampled
min_load = float('inf')
selected_server = sampled_indices[0]
for idx in sampled_indices:
if self.loads[idx] < min_load:
min_load = self.loads[idx]
selected_server = idx
# Assign request to selected server
self.loads[selected_server] += request_size
self.queue_lengths[selected_server] += 1
self.requests_processed += 1
# Record load distribution periodically
if self.requests_processed % 100 == 0:
self.load_history.append(self.loads.copy())
return selected_server
def process_request(self, server_idx: int, service_time: int = 1):
"""Simulate request processing"""
if self.loads[server_idx] > 0:
self.loads[server_idx] -= min(service_time, self.loads[server_idx])
self.queue_lengths[server_idx] = max(0, self.queue_lengths[server_idx] - 1)
def get_load_statistics(self) -> Dict:
"""Get statistics about load distribution"""
loads = self.loads
queue_lengths = self.queue_lengths
stats = {
'mean_load': np.mean(loads),
'std_load': np.std(loads),
'max_load': max(loads),
'min_load': min(loads),
'load_imbalance': max(loads) - min(loads),
'mean_queue_length': np.mean(queue_lengths),
'max_queue_length': max(queue_lengths),
'empty_servers': sum(1 for l in loads if l == 0),
'high_load_servers': sum(1 for l in loads if l > 2 * np.mean(loads))
}
# Gini coefficient for load inequality
sorted_loads = np.sort(loads)
n = len(sorted_loads)
cumulative = np.cumsum(sorted_loads)
gini = (n + 1 - 2 * np.sum(cumulative) / cumulative[-1]) / n
stats['gini_coefficient'] = gini
return stats
def theoretical_improvement(self) -> float:
"""
Theoretical improvement over random assignment
For d=2, maximum load is O(log log n / log 2) vs O(log n) for d=1
"""
if self.d == 1:
# Random assignment
return np.log(self.n_servers) / np.log(np.log(self.n_servers))
else:
# Power of d choices
return np.log(np.log(self.n_servers)) / np.log(self.d)
class AdaptivePowerOfD:
"""
Adaptive d based on system load
Idea: Increase d when system is heavily loaded,
decrease d when lightly loaded to reduce sampling overhead
"""
def __init__(self, n_servers: int, max_d: int = 5):
self.n_servers = n_servers
self.max_d = max_d
self.loads = [0] * n_servers
# Adaptive parameters
self.current_d = 2
self.sampling_history = deque(maxlen=100)
def adaptive_dispatch(self, request_size: int = 1) -> int:
"""Dispatch with adaptive d"""
# Update d based on system state
self._update_d()
# Use current d for sampling
if self.current_d >= self.n_servers:
# Check all servers if d >= n
sampled_indices = list(range(self.n_servers))
else:
sampled_indices = random.sample(range(self.n_servers), self.current_d)
# Find least loaded
min_load = float('inf')
selected = sampled_indices[0]
for idx in sampled_indices:
if self.loads[idx] < min_load:
min_load = self.loads[idx]
selected = idx
# Assign request
self.loads[selected] += request_size
self.sampling_history.append(self.current_d)
return selected
def _update_d(self):
"""Adapt d based on system load"""
mean_load = np.mean(self.loads)
load_std = np.std(self.loads)
# High load or high variance โ increase d
if mean_load > 0.7 * self.n_servers or load_std > 0.3 * mean_load:
self.current_d = min(self.current_d + 1, self.max_d)
# Low load and low variance โ decrease d
elif mean_load < 0.3 * self.n_servers and load_std < 0.1 * mean_load:
self.current_d = max(self.current_d - 1, 1)
def get_adaptation_stats(self) -> Dict:
"""Get statistics about d adaptation"""
if not self.sampling_history:
return {'current_d': self.current_d}
history = list(self.sampling_history)
return {
'current_d': self.current_d,
'mean_d': np.mean(history),
'std_d': np.std(history),
'min_d': min(history),
'max_d': max(history),
'adaptation_frequency': len(set(history)) / len(history)
}
class WeightedPowerOfD:
"""
Weighted power of d choices
Servers have different capacities/weights
Choose server with minimum load/weight ratio
"""
def __init__(self, server_weights: List[float]):
self.weights = server_weights
self.n_servers = len(server_weights)
self.loads = [0] * self.n_servers
def weighted_dispatch(self, request_size: float, d: int = 2) -> int:
"""Dispatch considering server weights"""
sampled = random.sample(range(self.n_servers), min(d, self.n_servers))
# Find server with minimum load/weight ratio
min_ratio = float('inf')
selected = sampled[0]
for idx in sampled:
ratio = self.loads[idx] / self.weights[idx]
if ratio < min_ratio:
min_ratio = ratio
selected = idx
self.loads[selected] += request_size
return selected
def get_weighted_stats(self) -> Dict:
"""Statistics considering weights"""
ratios = [l / w for l, w in zip(self.loads, self.weights)]
return {
'mean_ratio': np.mean(ratios),
'std_ratio': np.std(ratios),
'max_ratio': max(ratios),
'min_ratio': min(ratios),
'fairness_index': self._jain_fairness_index(ratios)
}
def _jain_fairness_index(self, values: List[float]) -> float:
"""Jain's fairness index"""
n = len(values)
numerator = sum(values) ** 2
denominator = n * sum(v**2 for v in values)
return numerator / denominator if denominator > 0 else 0
class ConsistentHashingWithLoad:
"""
Combine consistent hashing with power of d choices
First use consistent hashing to map request to servers,
then use power of d choices among neighboring servers
"""
def __init__(self, n_servers: int, virtual_nodes: int = 100, d: int = 2):
self.n_servers = n_servers
self.virtual_nodes = virtual_nodes
self.d = d
# Consistent hashing ring
self.ring = {}
self.sorted_keys = []
# Server loads
self.loads = [0] * n_servers
# Initialize ring
self._initialize_ring()
def _initialize_ring(self):
"""Initialize consistent hashing ring"""
for server in range(self.n_servers):
for vnode in range(self.virtual_nodes):
key = hash(f"{server}-{vnode}") % (2**32)
self.ring[key] = server
self.sorted_keys.append(key)
self.sorted_keys.sort()
def dispatch(self, request_id: str, request_size: int = 1) -> int:
"""Dispatch using consistent hashing + power of d"""
# Hash request
request_hash = hash(request_id) % (2**32)
# Find primary server (consistent hashing)
primary_idx = self._find_server(request_hash)
# Get d-1 neighboring servers
neighbors = self._get_neighbors(primary_idx, self.d - 1)
# Consider primary + neighbors
candidates = [primary_idx] + neighbors
# Choose least loaded among candidates
min_load = float('inf')
selected = primary_idx
for server in candidates:
if self.loads[server] < min_load:
min_load = self.loads[server]
selected = server
# Assign request
self.loads[selected] += request_size
return selected
def _find_server(self, key: int) -> int:
"""Find server for key using consistent hashing"""
if not self.sorted_keys:
return 0
# Binary search for key
import bisect
idx = bisect.bisect_left(self.sorted_keys, key)
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def _get_neighbors(self, server_idx: int, count: int) -> List[int]:
"""Get neighboring servers in the ring"""
# Find position of server's virtual nodes
server_vnodes = [k for k, v in self.ring.items() if v == server_idx]
if not server_vnodes:
return []
# Take first virtual node as reference
ref_vnode = server_vnodes[0]
ref_idx = self.sorted_keys.index(ref_vnode)
# Get neighbors clockwise
neighbors = []
for offset in range(1, len(self.sorted_keys)):
neighbor_idx = (ref_idx + offset) % len(self.sorted_keys)
neighbor_server = self.ring[self.sorted_keys[neighbor_idx]]
if neighbor_server != server_idx and neighbor_server not in neighbors:
neighbors.append(neighbor_server)
if len(neighbors) >= count:
break
return neighbors
def add_server(self):
"""Add new server to the ring"""
new_server = self.n_servers
self.n_servers += 1
self.loads.append(0)
# Add virtual nodes for new server
for vnode in range(self.virtual_nodes):
key = hash(f"{new_server}-{vnode}") % (2**32)
self.ring[key] = new_server
self.sorted_keys.append(key)
self.sorted_keys.sort()
# Rebalance some load (in practice)
return new_server
def remove_server(self, server_idx: int):
"""Remove server from the ring"""
# Remove virtual nodes
keys_to_remove = [k for k, v in self.ring.items() if v == server_idx]
for key in keys_to_remove:
del self.ring[key]
self.sorted_keys.remove(key)
# Redistribute load (simplified)
total_load = self.loads[server_idx]
avg_redistribution = total_load / (self.n_servers - 1)
for i in range(self.n_servers):
if i != server_idx:
self.loads[i] += avg_redistribution
# Remove server
self.loads.pop(server_idx)
self.n_servers -= 1
# Update server indices in ring
new_ring = {}
for key, server in self.ring.items():
if server > server_idx:
new_ring[key] = server - 1
else:
new_ring[key] = server
self.ring = new_ring
# Theoretical analysis
class PowerOfDAnalysis:
"""Theoretical analysis of power of d choices"""
@staticmethod
def expected_max_load(n: int, d: int, m: int) -> float:
"""
Expected maximum load with power of d choices
n = number of servers (bins)
m = number of requests (balls)
d = number of choices
For d=1 (random): O(log n / log log n)
For d=2: O(log log n / log 2)
For dโฅ2: O(log log n / log d)
"""
if d == 1:
# Classical balls and bins
return np.log(n) / np.log(np.log(n)) * (m / n)
else:
return np.log(np.log(n)) / np.log(d) * (m / n)
@staticmethod
def load_imbalance_reduction(d: int) -> float:
"""
How much load imbalance reduces with d
Returns: Factor reduction in maximum load compared to random
"""
# Approximate reduction factor
return np.log(d) / np.log(2)
@staticmethod
def optimal_d(n: int, sampling_cost: float, imbalance_cost: float) -> int:
"""
Optimal d balancing sampling cost vs load imbalance
sampling_cost: Cost to sample one server
imbalance_cost: Cost per unit of load imbalance
"""
# Trade-off: More d reduces imbalance but increases sampling
optimal = 1
for d in range(2, n + 1):
# Expected imbalance with d choices
imbalance = PowerOfDAnalysis.expected_max_load(n, d, n)
imbalance_d1 = PowerOfDAnalysis.expected_max_load(n, 1, n)
imbalance_reduction = imbalance_d1 - imbalance
# Cost benefit
sampling_overhead = d * sampling_cost
imbalance_benefit = imbalance_reduction * imbalance_cost
if imbalance_benefit > sampling_overhead:
optimal = d
else:
break
return optimal
# Simulation and comparison
class LoadBalancingSimulator:
"""Simulate different load balancing strategies"""
def __init__(self, n_servers: int, total_requests: int):
self.n_servers = n_servers
self.total_requests = total_requests
def simulate_strategy(self, strategy: str, **kwargs) -> Dict:
"""Simulate a specific load balancing strategy"""
if strategy == 'random':
return self._simulate_random()
elif strategy == 'round_robin':
return self._simulate_round_robin()
elif strategy == 'least_loaded':
return self._simulate_least_loaded()
elif strategy == 'power_of_d':
d = kwargs.get('d', 2)
return self._simulate_power_of_d(d)
elif strategy == 'consistent_hashing':
return self._simulate_consistent_hashing()
elif strategy == 'weighted':
weights = kwargs.get('weights', [1] * self.n_servers)
return self._simulate_weighted(weights)
else:
raise ValueError(f"Unknown strategy: {strategy}")
def _simulate_random(self) -> Dict:
"""Random assignment"""
loads = [0] * self.n_servers
for _ in range(self.total_requests):
server = random.randint(0, self.n_servers - 1)
loads[server] += 1
return self._analyze_loads(loads, 'random')
def _simulate_round_robin(self) -> Dict:
"""Round robin assignment"""
loads = [0] * self.n_servers
current = 0
for _ in range(self.total_requests):
loads[current] += 1
current = (current + 1) % self.n_servers
return self._analyze_loads(loads, 'round_robin')
def _simulate_least_loaded(self) -> Dict:
"""Always choose least loaded server (check all)"""
loads = [0] * self.n_servers
for _ in range(self.total_requests):
# Find least loaded server
min_load = min(loads)
min_servers = [i for i, l in enumerate(loads) if l == min_load]
# Choose randomly among least loaded
server = random.choice(min_servers)
loads[server] += 1
return self._analyze_loads(loads, 'least_loaded')
def _simulate_power_of_d(self, d: int) -> Dict:
"""Power of d choices"""
loads = [0] * self.n_servers
for _ in range(self.total_requests):
# Sample d servers
sampled = random.sample(range(self.n_servers), min(d, self.n_servers))
# Find least loaded among sampled
min_load = min(loads[i] for i in sampled)
min_sampled = [i for i in sampled if loads[i] == min_load]
server = random.choice(min_sampled)
loads[server] += 1
return self._analyze_loads(loads, f'power_of_{d}')
def _simulate_consistent_hashing(self) -> Dict:
"""Consistent hashing"""
loads = [0] * self.n_servers
# Simple consistent hashing simulation
ring_size = 1000
ring = [i % self.n_servers for i in range(ring_size)]
for i in range(self.total_requests):
request_hash = hash(str(i)) % ring_size
server = ring[request_hash]
loads[server] += 1
return self._analyze_loads(loads, 'consistent_hashing')
def _simulate_weighted(self, weights: List[float]) -> Dict:
"""Weighted round robin"""
loads = [0] * self.n_servers
# Normalize weights
total_weight = sum(weights)
normalized = [w / total_weight for w in weights]
# Cumulative distribution
cumulative = []
current = 0
for w in normalized:
current += w
cumulative.append(current)
for i in range(self.total_requests):
r = random.random()
for server, cum in enumerate(cumulative):
if r <= cum:
loads[server] += 1
break
return self._analyze_loads(loads, 'weighted')
def _analyze_loads(self, loads: List[int], strategy: str) -> Dict:
"""Analyze load distribution"""
mean_load = np.mean(loads)
std_load = np.std(loads)
max_load = max(loads)
min_load = min(loads)
# Imbalance metrics
imbalance_ratio = max_load / mean_load if mean_load > 0 else float('inf')
cv = std_load / mean_load if mean_load > 0 else 0 # Coefficient of variation
return {
'strategy': strategy,
'mean_load': mean_load,
'std_load': std_load,
'max_load': max_load,
'min_load': min_load,
'imbalance_ratio': imbalance_ratio,
'coefficient_of_variation': cv,
'empty_servers': sum(1 for l in loads if l == 0),
'loads': loads.copy()
}
def compare_strategies(self, strategies: List[Tuple[str, Dict]]) -> List[Dict]:
"""Compare multiple strategies"""
results = []
for strategy_name, kwargs in strategies:
result = self.simulate_strategy(strategy_name, **kwargs)
results.append(result)
# Sort by imbalance ratio (lower is better)
results.sort(key=lambda x: x['imbalance_ratio'])
return results
Power of d Choices Insights:
- d=1: Random assignment โ O(log n) maximum load
- d=2: Check two, pick least loaded โ O(log log n) maximum load
- d=log n: Check logarithmic number โ Constant factor imbalance
- d=n: Check all (least loaded) โ Perfect balance
Key Theoretical Result: Checking just d=2 provides exponential improvement over random assignment, with most benefit achieved by d=2-5.
๐ 15. Consistent Hashing Theory
Mathematical analysis of consistent hashing properties.
import hashlib
import bisect
from typing import List, Dict, Set, Tuple
import numpy as np
import math
from collections import defaultdict
class ConsistentHashingTheory:
"""Theoretical analysis of consistent hashing"""
@staticmethod
def load_distribution(n_nodes: int,
n_items: int,
vnodes_per_node: int = 100) -> Dict:
"""
Analyze load distribution in consistent hashing
Returns: Statistical properties of load distribution
"""
# Simulate consistent hashing
ring_size = 2**32
ring = []
node_positions = defaultdict(list)
# Place virtual nodes
for node in range(n_nodes):
for vnode in range(vnodes_per_node):
position = ConsistentHashingTheory._hash_position(
f"node{node}-vnode{vnode}", ring_size
)
ring.append(position)
node_positions[node].append(position)
ring.sort()
# Simulate item placement
loads = [0] * n_nodes
for item in range(n_items):
position = ConsistentHashingTheory._hash_position(
f"item{item}", ring_size
)
# Find node responsible
idx = bisect.bisect_left(ring, position)
if idx == len(ring):
idx = 0
# Determine which node owns this position
for node, positions in node_positions.items():
if ring[idx] in positions:
loads[node] += 1
break
# Analyze distribution
mean_load = n_items / n_nodes
std_load = np.std(loads)
cv = std_load / mean_load if mean_load > 0 else 0
# Theoretical bounds
# With k virtual nodes per server, load per server is O((log n)/k)
theoretical_std = math.sqrt(mean_load / vnodes_per_node)
return {
'actual_mean': mean_load,
'actual_std': std_load,
'actual_cv': cv,
'theoretical_std': theoretical_std,
'max_load': max(loads),
'min_load': min(loads),
'imbalance_ratio': max(loads) / mean_load,
'loads': loads
}
@staticmethod
def movement_cost(n_old: int,
n_new: int,
vnodes_per_node: int = 100,
total_items: int = 100000) -> float:
"""
Calculate fraction of items that move when adding/removing nodes
In consistent hashing, only O(1/n) items need to move
"""
ring_size = 2**32
# Create old ring
old_ring = []
old_nodes = {}
for node in range(n_old):
for vnode in range(vnodes_per_node):
pos = ConsistentHashingTheory._hash_position(
f"old{node}-{vnode}", ring_size
)
old_ring.append(pos)
old_nodes[pos] = node
old_ring.sort()
# Create new ring (add one node)
new_ring = []
new_nodes = {}
for node in range(n_new):
for vnode in range(vnodes_per_node):
pos = ConsistentHashingTheory._hash_position(
f"new{node}-{vnode}", ring_size
)
new_ring.append(pos)
new_nodes[pos] = node
new_ring.sort()
# Calculate movement
moved_items = 0
for item in range(total_items):
item_pos = ConsistentHashingTheory._hash_position(
f"item{item}", ring_size
)
# Find responsible node in old ring
old_idx = bisect.bisect_left(old_ring, item_pos)
if old_idx == len(old_ring):
old_idx = 0
old_node = old_nodes[old_ring[old_idx]]
# Find responsible node in new ring
new_idx = bisect.bisect_left(new_ring, item_pos)
if new_idx == len(new_ring):
new_idx = 0
new_node = new_nodes[new_ring[new_idx]]
if old_node != new_node:
moved_items += 1
fraction_moved = moved_items / total_items
theoretical_moved = 1 / (n_old + 1) # When adding one node
return {
'actual_fraction_moved': fraction_moved,
'theoretical_fraction': theoretical_moved,
'absolute_moved': moved_items,
'efficiency': theoretical_moved / fraction_moved if fraction_moved > 0 else 0
}
@staticmethod
def virtual_nodes_analysis(n_nodes: int,
n_items: int,
vnodes_range: List[int]) -> List[Dict]:
"""
Analyze effect of virtual nodes on load balance
"""
results = []
for vnodes in vnodes_range:
dist = ConsistentHashingTheory.load_distribution(
n_nodes, n_items, vnodes
)
results.append({
'virtual_nodes': vnodes,
'coefficient_of_variation': dist['actual_cv'],
'imbalance_ratio': dist['imbalance_ratio'],
'theoretical_std': dist['theoretical_std'],
'actual_std': dist['actual_std']
})
return results
@staticmethod
def bounded_loads(n_nodes: int,
capacity: float,
vnodes_per_node: int = 100) -> Dict:
"""
Analyze bounded-load consistent hashing
Each node has capacity C, algorithm skips full nodes
"""
ring_size = 2**32
ring = []
node_loads = [0] * n_nodes
node_capacities = [capacity] * n_nodes
# Initialize ring
for node in range(n_nodes):
for vnode in range(vnodes_per_node):
pos = ConsistentHashingTheory._hash_position(
f"node{node}-{vnode}", ring_size
)
ring.append((pos, node))
ring.sort(key=lambda x: x[0])
# Simulate with bounded loads
rejected = 0
total_items = 10000
for item in range(total_items):
pos = ConsistentHashingTheory._hash_position(
f"item{item}", ring_size
)
# Find position in ring
idx = bisect.bisect_left([r[0] for r in ring], pos)
if idx == len(ring):
idx = 0
# Try nodes in order until finding one with capacity
placed = False
attempts = 0
max_attempts = len(ring) # Try entire ring
while not placed and attempts < max_attempts:
current_idx = (idx + attempts) % len(ring)
node = ring[current_idx][1]
if node_loads[node] < node_capacities[node]:
node_loads[node] += 1
placed = True
break
attempts += 1
if not placed:
rejected += 1
utilization = sum(node_loads) / (n_nodes * capacity)
rejection_rate = rejected / total_items
return {
'utilization': utilization,
'rejection_rate': rejection_rate,
'node_loads': node_loads,
'max_load': max(node_loads),
'min_load': min(node_loads),
'load_std': np.std(node_loads)
}
@staticmethod
def _hash_position(key: str, ring_size: int) -> int:
"""Hash key to position on ring"""
# Using MD5 for good distribution
hash_obj = hashlib.md5(key.encode())
hash_int = int(hash_obj.hexdigest(), 16)
return hash_int % ring_size
class RendezvousHashing:
"""
Rendezvous (Highest Random Weight) hashing
Alternative to consistent hashing with better load distribution
but O(n) computation for each key
"""
def __init__(self, nodes: List[str]):
self.nodes = nodes
def assign(self, key: str) -> str:
"""
Assign key to node with highest weight
weight = hash(key + node)
Choose node with maximum weight
"""
max_weight = -1
selected_node = self.nodes[0]
for node in self.nodes:
weight = self._combined_hash(key, node)
if weight > max_weight:
max_weight = weight
selected_node = node
return selected_node
def _combined_hash(self, key: str, node: str) -> int:
"""Combine hash of key and node"""
# Use SHA256 for good distribution
combined = f"{key}:{node}"
hash_obj = hashlib.sha256(combined.encode())
return int(hash_obj.hexdigest(), 16)
@staticmethod
def load_analysis(n_nodes: int, n_items: int) -> Dict:
"""Analyze load distribution for rendezvous hashing"""
nodes = [f"node{i}" for i in range(n_nodes)]
hasher = RendezvousHashing(nodes)
loads = [0] * n_nodes
for item in range(n_items):
key = f"item{item}"
node = hasher.assign(key)
node_idx = int(node[4:]) # Extract node number
loads[node_idx] += 1
mean_load = n_items / n_nodes
std_load = np.std(loads)
cv = std_load / mean_load
# Theoretical: CV โ 1/sqrt(mean_load) for good hash functions
theoretical_cv = 1 / math.sqrt(mean_load)
return {
'mean_load': mean_load,
'actual_cv': cv,
'theoretical_cv': theoretical_cv,
'max_load': max(loads),
'min_load': min(loads),
'imbalance_ratio': max(loads) / mean_load
}
class JumpHash:
"""
Jump Hash: Consistent hashing without virtual nodes
O(log n) computation, minimal memory, perfect balance
But can only add/remove nodes at the end
"""
@staticmethod
def jump_hash(key: str, num_buckets: int) -> int:
"""
Assign key to bucket using jump hash algorithm
Based on: "A Fast, Minimal Memory, Consistent Hash Algorithm"
by John Lamping and Eric Veach (Google)
"""
key_hash = ConsistentHashingTheory._hash_position(key, 2**32)
b = -1
j = 0
while j < num_buckets:
b = j
key_hash = (key_hash * 2862933555777941757) + 1
j = int((b + 1) * (2**31 / ((key_hash >> 33) + 1)))
return b
@staticmethod
def analyze_performance(num_buckets_range: List[int],
num_keys: int = 100000) -> List[Dict]:
"""Analyze jump hash performance"""
results = []
for num_buckets in num_buckets_range:
loads = [0] * num_buckets
for key in range(num_keys):
bucket = JumpHash.jump_hash(str(key), num_buckets)
loads[bucket] += 1
mean_load = num_keys / num_buckets
std_load = np.std(loads)
cv = std_load / mean_load
# Jump hash provides near-perfect balance
results.append({
'buckets': num_buckets,
'mean_load': mean_load,
'std_load': std_load,
'coefficient_of_variation': cv,
'max_load': max(loads),
'min_load': min(loads),
'imbalance': max(loads) / mean_load
})
return results
@staticmethod
def movement_analysis(old_buckets: int, new_buckets: int) -> float:
"""
Calculate fraction of keys that move when changing bucket count
Jump hash minimizes movement to approximately 1/new_buckets
"""
moved = 0
total = 10000
for key in range(total):
old_bucket = JumpHash.jump_hash(str(key), old_buckets)
new_bucket = JumpHash.jump_hash(str(key), new_buckets)
if old_bucket != new_bucket:
moved += 1
return moved / total
# Comparison of hashing algorithms
class HashingAlgorithmComparison:
"""Compare different consistent hashing algorithms"""
@staticmethod
def compare_algorithms(n_nodes: int,
n_items: int,
vnodes: int = 100) -> List[Dict]:
"""Compare multiple hashing algorithms"""
results = []
# 1. Consistent hashing with virtual nodes
ch_results = ConsistentHashingTheory.load_distribution(
n_nodes, n_items, vnodes
)
results.append({
'algorithm': 'consistent_hashing',
'vnodes': vnodes,
'cv': ch_results['actual_cv'],
'imbalance': ch_results['imbalance_ratio'],
'memory': n_nodes * vnodes,
'computation': 'O(log(vnodes*n))'
})
# 2. Rendezvous hashing
rh_results = RendezvousHashing.load_analysis(n_nodes, n_items)
results.append({
'algorithm': 'rendezvous_hashing',
'cv': rh_results['actual_cv'],
'imbalance': rh_results['imbalance_ratio'],
'memory': 'O(n)',
'computation': 'O(n)'
})
# 3. Jump hash
jh_results = JumpHash.analyze_performance([n_nodes], n_items)[0]
results.append({
'algorithm': 'jump_hash',
'cv': jh_results['coefficient_of_variation'],
'imbalance': jh_results['imbalance'],
'memory': 'O(1)',
'computation': 'O(log n)',
'limitation': 'Can only add/remove at end'
})
# 4. Maglev hashing (Google's approach)
# Not implemented here, but known properties
results.append({
'algorithm': 'maglev_hashing',
'cv': 'Near 0',
'imbalance': '1.0 (perfect)',
'memory': 'O(n) lookup table',
'computation': 'O(1)',
'note': 'Used in Google's load balancers'
})
# Sort by imbalance (lower is better)
results.sort(key=lambda x: x.get('imbalance', float('inf')))
return results
@staticmethod
def recommend_algorithm(requirements: Dict) -> str:
"""Recommend hashing algorithm based on requirements"""
n_nodes = requirements.get('n_nodes', 10)
n_items = requirements.get('n_items', 1000000)
need_perfect_balance = requirements.get('perfect_balance', False)
need_fast_lookup = requirements.get('fast_lookup', True)
need_dynamic = requirements.get('dynamic_nodes', True)
memory_constrained = requirements.get('memory_constrained', False)
if need_perfect_balance and need_fast_lookup and not memory_constrained:
return "maglev_hashing"
elif memory_constrained and not need_dynamic:
return "jump_hash"
elif need_fast_lookup and n_nodes <= 1000:
return "consistent_hashing_with_vnodes"
elif n_nodes <= 100 and need_perfect_balance:
return "rendezvous_hashing"
else:
return "consistent_hashing_with_vnodes"
# Practical implementation with theoretical guarantees
class TheoreticalConsistentHash:
"""
Consistent hashing with theoretical guarantees
Implements algorithm with provable bounds on:
1. Load imbalance
2. Movement cost
3. Lookup time
"""
def __init__(self, nodes: List[str], vnodes_per_node: int = 100):
self.nodes = nodes
self.vnodes_per_node = vnodes_per_node
self.ring_size = 2**32
# Build ring with provable properties
self.ring, self.node_map = self._build_ring()
# Statistics for theoretical analysis
self.stats = {
'lookups': 0,
'movements': 0,
'loads': {node: 0 for node in nodes}
}
def _build_ring(self) -> Tuple[List[int], Dict[int, str]]:
"""Build ring with theoretical guarantees"""
ring = []
node_map = {}
for node in self.nodes:
# Use deterministic virtual node positions
for vnode in range(self.vnodes_per_node):
# Use SHA256 for uniform distribution
key = f"{node}:{vnode}"
hash_bytes = hashlib.sha256(key.encode()).digest()
# Use first 4 bytes for position
position = int.from_bytes(hash_bytes[:4], byteorder='big')
position = position % self.ring_size
ring.append(position)
node_map[position] = node
ring.sort()
return ring, node_map
def get_node(self, key: str) -> str:
"""Get node for key with O(log n) lookup"""
self.stats['lookups'] += 1
# Hash key
hash_bytes = hashlib.sha256(key.encode()).digest()
key_pos = int.from_bytes(hash_bytes[:4], byteorder='big')
key_pos = key_pos % self.ring_size
# Binary search in ring
idx = bisect.bisect_left(self.ring, key_pos)
if idx == len(self.ring):
idx = 0
position = self.ring[idx]
node = self.node_map[position]
# Update load statistics
self.stats['loads'][node] += 1
return node
def add_node(self, new_node: str):
"""Add node with minimal movement"""
old_loads = self.stats['loads'].copy()
# Add new node to ring
self.nodes.append(new_node)
self.stats['loads'][new_node] = 0
# Rebuild ring (in practice, incremental update)
self.ring, self.node_map = self._build_ring()
# Calculate movements
movements = 0
# In practice, would track which keys moved
self.stats['movements'] += movements
def remove_node(self, node_to_remove: str):
"""Remove node with controlled movement"""
if node_to_remove not in self.nodes:
return
old_loads = self.stats['loads'].copy()
# Remove node
self.nodes.remove(node_to_remove)
del self.stats['loads'][node_to_remove]
# Rebuild ring
self.ring, self.node_map = self._build_ring()
# Redistribute load
total_redistributed = old_loads[node_to_remove]
avg_redistribute = total_redistributed / len(self.nodes)
for node in self.nodes:
self.stats['loads'][node] += avg_redistribute
def get_theoretical_bounds(self) -> Dict:
"""Get theoretical performance bounds"""
n = len(self.nodes)
k = self.vnodes_per_node
# Load imbalance bound
# With k virtual nodes, max load โค (1 + ฮต) * average load
# where ฮต = O(1/โk)
epsilon = 1 / math.sqrt(k)
max_load_bound = (1 + epsilon) * (sum(self.stats['loads'].values()) / n)
# Movement bound when adding/removing node
# Fraction moved โค O(1/n)
movement_bound = 1 / n
# Lookup time
lookup_time = math.log2(n * k) # Binary search in ring
return {
'max_load_bound': max_load_bound,
'actual_max_load': max(self.stats['loads'].values()),
'movement_bound_per_node_change': movement_bound,
'lookup_time_complexity': f'O(log({n * k})) = {lookup_time:.1f}',
'memory_usage': f'{n * k} entries',
'load_imbalance_ratio': max(self.stats['loads'].values()) /
(sum(self.stats['loads'].values()) / n)
}
def verify_bounds(self, num_tests: int = 10000) -> Dict:
"""Verify theoretical bounds through simulation"""
# Test load distribution
test_loads = {node: 0 for node in self.nodes}
for i in range(num_tests):
key = f"test_key_{i}"
node = self.get_node(key)
test_loads[node] += 1
actual_max = max(test_loads.values())
average = num_tests / len(self.nodes)
actual_imbalance = actual_max / average
bounds = self.get_theoretical_bounds()
theoretical_imbalance = bounds['max_load_bound'] / average
return {
'actual_imbalance_ratio': actual_imbalance,
'theoretical_imbalance_bound': theoretical_imbalance,
'within_bounds': actual_imbalance <= theoretical_imbalance,
'actual_max_load': actual_max,
'average_load': average
}
Consistent Hashing Theory Summary:
- Load Balance: With k virtual nodes, imbalance โค O(1/โk)
- Movement Cost: Adding/removing node moves O(1/n) items
- Lookup Time: O(log n) with binary search
- Memory: O(nk) for n nodes, k virtual nodes
Algorithm Comparison:
| Algorithm | Imbalance | Lookup | Memory | Dynamic |
|---|---|---|---|---|
| Consistent Hashing | O(1/โk) | O(log n) | O(nk) | โ |
| Rendezvous | O(1/โn) | O(n) | O(1) | โ |
| Jump Hash | Perfect | O(log n) | O(1) | โ |
| Maglev | Perfect | O(1) | O(n) | Limited |
๐ 16. The Tail at Scale: Why p99 and p999 Matter
Understanding and managing latency outliers in distributed systems.
import numpy as np
from typing import List, Dict, Tuple
import math
from scipy import stats
import random
class TailLatencyAnalysis:
"""Analysis of tail latency in distributed systems"""
@staticmethod
def why_tails_matter(mean_latency: float,
std_latency: float,
distribution: str = 'normal') -> Dict:
"""
Demonstrate why p99, p999 matter more than mean
In distributed systems, user experience is determined
by worst-case latency, not average
"""
if distribution == 'normal':
# For normal distribution
p50 = mean_latency
p90 = mean_latency + 1.28 * std_latency
p95 = mean_latency + 1.645 * std_latency
p99 = mean_latency + 2.326 * std_latency
p999 = mean_latency + 3.09 * std_latency
elif distribution == 'exponential':
# For exponential distribution (common in queues)
# p(X > x) = e^(-ฮปx), where ฮป = 1/mean
rate = 1 / mean_latency
p50 = -math.log(0.5) / rate
p90 = -math.log(0.1) / rate
p95 = -math.log(0.05) / rate
p99 = -math.log(0.01) / rate
p999 = -math.log(0.001) / rate
else:
raise ValueError(f"Unknown distribution: {distribution}")
# Amplification factors
p90_factor = p90 / mean_latency
p99_factor = p99 / mean_latency
p999_factor = p999 / mean_latency
return {
'mean': mean_latency,
'p50': p50,
'p90': p90,
'p95': p95,
'p99': p99,
'p999': p999,
'p90_amplification': p90_factor,
'p99_amplification': p99_factor,
'p999_amplification': p999_factor,
'distribution': distribution
}
@staticmethod
def fanout_amplification(service_latency: Dict,
fanout: int,
parallelism: int) -> Dict:
"""
Calculate how tail latency amplifies with fanout
When service calls multiple backends, overall latency
determined by slowest backend
"""
mean = service_latency['mean']
std = service_latency.get('std', mean * 0.5) # Default CV=0.5
# Simulate fanout
n_simulations = 10000
overall_latencies = []
for _ in range(n_simulations):
# Simulate backend latencies
backend_latencies = np.random.normal(mean, std, fanout)
# Process in parallel batches
batch_size = parallelism
batch_times = []
for i in range(0, fanout, batch_size):
batch = backend_latencies[i:i+batch_size]
batch_time = max(batch) # Parallel, limited by slowest
batch_times.append(batch_time)
# Sequential batches
total_time = sum(batch_times)
overall_latencies.append(total_time)
# Calculate percentiles
overall_latencies = np.array(overall_latencies)
return {
'fanout': fanout,
'parallelism': parallelism,
'mean_overall': np.mean(overall_latencies),
'p50_overall': np.percentile(overall_latencies, 50),
'p90_overall': np.percentile(overall_latencies, 90),
'p99_overall': np.percentile(overall_latencies, 99),
'p999_overall': np.percentile(overall_latencies, 99.9),
'amplification_p99': np.percentile(overall_latencies, 99) / mean,
'expected_parallel': mean * math.ceil(fanout / parallelism),
'actual_vs_expected_ratio': np.mean(overall_latencies) /
(mean * math.ceil(fanout / parallelism))
}
@staticmethod
def coordinated_omission(measurement_interval: float,
service_time_mean: float,
service_time_std: float,
think_time_mean: float) -> Dict:
"""
Demonstrate coordinated omission problem
When system is busy, measurements miss the true latency
because requests are delayed before even starting
"""
# Simulate requests over time
simulation_time = 1000
current_time = 0
request_times = []
start_times = []
end_times = []
while current_time < simulation_time:
# Request arrives
arrival = current_time
start_times.append(arrival)
# Service time (variable)
service_time = max(0, np.random.normal(service_time_mean, service_time_std))
# Actual completion
completion = arrival + service_time
end_times.append(completion)
request_times.append(service_time)
# Next request after think time
think_time = np.random.exponential(think_time_mean)
current_time = max(completion, current_time + think_time)
# Naive measurement (only service time)
naive_mean = np.mean(request_times)
naive_p99 = np.percentile(request_times, 99)
# Correct measurement (including queueing delay)
queueing_delays = []
for i in range(1, len(start_times)):
previous_end = end_times[i-1]
current_start = start_times[i]
if current_start < previous_end:
# Request had to wait
queueing_delays.append(previous_end - current_start)
else:
queueing_delays.append(0)
# Add queueing to service time
total_times = []
for i in range(len(request_times)):
total_time = request_times[i]
if i < len(queueing_delays):
total_time += queueing_delays[i]
total_times.append(total_time)
correct_mean = np.mean(total_times)
correct_p99 = np.percentile(total_times, 99)
return {
'naive_mean': naive_mean,
'naive_p99': naive_p99,
'correct_mean': correct_mean,
'correct_p99': correct_p99,
'omission_error_mean': (correct_mean - naive_mean) / naive_mean,
'omission_error_p99': (correct_p99 - naive_p99) / naive_p99,
'queueing_delay_mean': np.mean(queueing_delays),
'max_queueing_delay': max(queueing_delays) if queueing_delays else 0
}
class TailReductionTechniques:
"""Techniques for reducing tail latency"""
@staticmethod
def hedging_requests(service_latency_mean: float,
service_latency_std: float,
hedge_delay: float,
n_requests: int = 10000) -> Dict:
"""
Hedge requests: Send duplicate request after delay
Reduces tail latency at cost of extra load
"""
primary_times = np.random.normal(service_latency_mean,
service_latency_std,
n_requests)
hedge_times = []
for i in range(n_requests):
# Send hedge after delay
hedge_start = hedge_delay
hedge_service = np.random.normal(service_latency_mean,
service_latency_std)
hedge_completion = hedge_start + hedge_service
# Take whichever finishes first
completion = min(primary_times[i], hedge_completion)
hedge_times.append(completion)
hedge_times = np.array(hedge_times)
improvement_p99 = (np.percentile(primary_times, 99) -
np.percentile(hedge_times, 99))
improvement_p999 = (np.percentile(primary_times, 99.9) -
np.percentile(hedge_times, 99.9))
# Cost: extra requests sent
hedge_fraction = sum(hedge_times < primary_times) / n_requests
return {
'hedge_delay': hedge_delay,
'original_p99': np.percentile(primary_times, 99),
'hedged_p99': np.percentile(hedge_times, 99),
'original_p999': np.percentile(primary_times, 99.9),
'hedged_p999': np.percentile(hedge_times, 99.9),
'p99_improvement': improvement_p99,
'p999_improvement': improvement_p999,
'hedge_used_fraction': hedge_fraction,
'extra_load': hedge_fraction * 100 # Percentage extra requests
}
@staticmethod
def canary_routing(traffic_fraction: float,
fast_pool_mean: float,
slow_pool_mean: float,
std: float) -> Dict:
"""
Canary routing: Send fraction to different pools
Helps identify and avoid slow instances
"""
n_requests = 10000
canary_count = int(n_requests * traffic_fraction)
main_count = n_requests - canary_count
# Generate latencies
main_latencies = np.random.normal(fast_pool_mean, std, main_count)
canary_latencies = np.random.normal(slow_pool_mean, std, canary_count)
# Combine
all_latencies = np.concatenate([main_latencies, canary_latencies])
# With perfect detection (avoid slow pool)
# In reality, detection has some delay
perfect_latencies = np.random.normal(fast_pool_mean, std, n_requests)
return {
'traffic_fraction_to_slow': traffic_fraction,
'with_canary_p99': np.percentile(all_latencies, 99),
'perfect_avoidance_p99': np.percentile(perfect_latencies, 99),
'improvement_if_perfect': (np.percentile(all_latencies, 99) -
np.percentile(perfect_latencies, 99)),
'slow_pool_detection_delay': 'Simulated separately'
}
@staticmethod
def latency_injection_detection(true_latency_mean: float,
true_latency_std: float,
injection_prob: float,
injection_amount: float) -> Dict:
"""
Detect and mitigate latency injections
Some requests randomly take much longer (GC, compaction, etc.)
"""
n_requests = 10000
latencies = []
for _ in range(n_requests):
base_latency = np.random.normal(true_latency_mean, true_latency_std)
# Random injection
if random.random() < injection_prob:
latency = base_latency + injection_amount
else:
latency = base_latency
latencies.append(latency)
latencies = np.array(latencies)
# Mitigation: timeout and retry
timeout = np.percentile(latencies, 95) # 95th percentile as timeout
mitigated_latencies = []
retries = 0
for latency in latencies:
if latency > timeout:
# Retry once
retry_latency = np.random.normal(true_latency_mean, true_latency_std)
total_latency = timeout + retry_latency # Wait timeout then retry
retries += 1
else:
total_latency = latency
mitigated_latencies.append(total_latency)
mitigated_latencies = np.array(mitigated_latencies)
return {
'injection_probability': injection_prob,
'injection_amount': injection_amount,
'original_p99': np.percentile(latencies, 99),
'mitigated_p99': np.percentile(mitigated_latencies, 99),
'improvement': np.percentile(latencies, 99) - np.percentile(mitigated_latencies, 99),
'retry_rate': retries / n_requests,
'timeout_used': timeout,
'cost_extra_load': retries / n_requests
}
class StatisticalAnalysis:
"""Statistical tools for tail analysis"""
@staticmethod
def extreme_value_theory(n_samples: int,
distribution: str = 'normal',
block_size: int = 100) -> Dict:
"""
Apply Extreme Value Theory to model tail behavior
EVT models distribution of maximum values
"""
if distribution == 'normal':
samples = np.random.normal(0, 1, n_samples)
elif distribution == 'exponential':
samples = np.random.exponential(1, n_samples)
else:
raise ValueError(f"Unsupported distribution: {distribution}")
# Block maxima approach
n_blocks = n_samples // block_size
block_maxima = []
for i in range(n_blocks):
block = samples[i*block_size:(i+1)*block_size]
block_maxima.append(max(block))
# Fit GEV (Generalized Extreme Value) distribution
# Simplified: Use Gumbel distribution (Type I EVT)
block_maxima = np.array(block_maxima)
# Method of moments for Gumbel
euler_mascheroni = 0.5772156649
mean_max = np.mean(block_maxima)
std_max = np.std(block_maxima)
# Gumbel parameters
beta = std_max * math.sqrt(6) / math.pi
mu = mean_max - euler_mascheroni * beta
# Predict extreme percentiles
def gumbel_cdf(x, mu, beta):
return math.exp(-math.exp(-(x - mu) / beta))
# Find x such that CDF(x) = p
def gumbel_quantile(p, mu, beta):
return mu - beta * math.log(-math.log(p))
p9999 = gumbel_quantile(0.9999, mu, beta)
p99999 = gumbel_quantile(0.99999, mu, beta)
return {
'distribution': distribution,
'block_size': block_size,
'n_blocks': n_blocks,
'gumbel_mu': mu,
'gumbel_beta': beta,
'predicted_p9999': p9999,
'predicted_p99999': p99999,
'empirical_max': max(samples),
'evt_applicable': True if n_blocks >= 30 else False
}
@staticmethod
def heavy_tail_detection(samples: np.ndarray) -> Dict:
"""
Detect heavy-tailed distributions
Heavy tails cause extreme outliers that dominate p99, p999
"""
# Calculate moments
mean = np.mean(samples)
std = np.std(samples)
skew = stats.skew(samples)
kurtosis = stats.kurtosis(samples) # Excess kurtosis
# Hill estimator for tail index
sorted_samples = np.sort(samples)[::-1] # Descending
k = min(100, len(samples) // 10) # Number of largest values to use
if k > 1:
hill_estimator = 1 / (np.mean(np.log(sorted_samples[:k])) -
np.log(sorted_samples[k]))
else:
hill_estimator = float('inf')
# Check for heavy tail (kurtosis > 3, or tail index < 3)
is_heavy_tailed = kurtosis > 3 or hill_estimator < 3
# Compare percentiles
p50 = np.percentile(samples, 50)
p99 = np.percentile(samples, 99)
p999 = np.percentile(samples, 99.9)
tail_ratio_99 = p99 / p50
tail_ratio_999 = p999 / p50
return {
'mean': mean,
'std': std,
'skewness': skew,
'kurtosis': kurtosis,
'hill_estimator': hill_estimator,
'is_heavy_tailed': is_heavy_tailed,
'p50': p50,
'p99': p99,
'p999': p999,
'tail_ratio_99': tail_ratio_99,
'tail_ratio_999': tail_ratio_999,
'heavy_tail_severity': 'severe' if tail_ratio_999 > 10 else
'moderate' if tail_ratio_999 > 5 else
'light'
}
# Practical guidance
class TailLatencyGuidelines:
"""Practical guidelines for managing tail latency"""
@staticmethod
def design_for_tail():
"""Design principles for tail latency"""
return [
"1. Measure p99, p999, not just average",
"2. Use hedged requests for idempotent operations",
"3. Implement canary routing and load shedding",
"4. Set aggressive timeouts and retry with backoff",
"5. Use load balancing with health checks",
"6. Implement circuit breakers for failing dependencies",
"7. Use queue management and admission control",
"8. Monitor and alert on latency outliers"
]
@staticmethod
def monitoring_recommendations():
"""Monitoring recommendations for tail latency"""
return {
'metrics_to_track': [
'Histograms (not averages)',
'Percentiles: p50, p90, p95, p99, p999',
'Error rates at different percentiles',
'Timeouts and retry counts',
'Queue lengths and wait times'
],
'alerting_thresholds': [
'p99 > SLA * 2',
'p999 > SLA * 3',
'Error rate at p99 > 1%',
'Consecutive timeouts > 5'
],
'analysis_tools': [
'Heatmaps over time',
'Correlation with other metrics',
'Comparative analysis (A/B testing)',
'Root cause analysis for outliers'
]
}
@staticmethod
def capacity_planning_tail(sla_p99: float,
sla_p999: float,
current_p99: float,
current_p999: float,
growth_rate: float) -> Dict:
"""Capacity planning considering tail latency"""
# Tail latency often scales super-linearly with load
# Assume p99 โ load^ฮฑ, where ฮฑ > 1
# Estimate ฮฑ from current measurements
# Simplified: ฮฑ = log(p99_high_load / p99_low_load) / log(load_ratio)
# In practice, measure at different load levels
# For planning, use conservative ฮฑ = 1.5 (common for queueing systems)
alpha = 1.5
# Current utilization (estimated)
current_utilization = 0.7 # 70% utilization
# Load at which SLA would be violated
def load_at_violation(current_load, current_latency, target_latency, alpha):
# latency โ load^ฮฑ
# target_latency / current_latency = (target_load / current_load)^ฮฑ
ratio = target_latency / current_latency
target_load = current_load * (ratio ** (1/alpha))
return target_load
load_violation_p99 = load_at_violation(
current_utilization, current_p99, sla_p99, alpha
)
load_violation_p999 = load_at_violation(
current_utilization, current_p999, sla_p999, alpha
)
# Which violates first
first_violation_load = min(load_violation_p99, load_violation_p999)
violating_metric = 'p99' if load_violation_p99 < load_violation_p999 else 'p999'
# Time to violation with growth
time_to_violation = math.log(first_violation_load / current_utilization) / math.log(1 + growth_rate)
return {
'alpha_estimated': alpha,
'current_utilization': current_utilization,
'load_at_p99_violation': load_violation_p99,
'load_at_p999_violation': load_violation_p999,
'first_violation_at_load': first_violation_load,
'violating_metric': violating_metric,
'time_to_violation_months': time_to_violation * 12, # Convert to months if growth_rate monthly
'recommendation': f'Plan capacity upgrade within {time_to_violation:.1f} growth periods'
}
Tail at Scale Key Insights:
- Percentiles Matter: p99 is 10-100x more important than mean for user experience
- Fanout Amplification: Calling N services โ latency = max(service latencies)
- Coordinated Omission: Busy systems hide true latency in queueing delays
- Heavy Tails: Real-world latency distributions often have heavy tails
- Mitigation Techniques: Hedging, canary routing, timeouts, retries
Empirical Findings from Google:
- p99 latency often 10x p50
- p999 latency often 100x p50
- Fanout of 100 servers โ p99 = p99 of single server ^ 100
- Hedging with 1ms delay reduces p99 by 2x with 1% extra load
๐ 17. Network Calculus: Deterministic Performance Bounds
Mathematical framework for guaranteed performance in networks.
import numpy as np
from typing import List, Tuple, Callable
import math
class ArrivalCurve:
"""Arrival curve ฮฑ(t) bounds arrivals in any interval"""
def __init__(self, rate: float, burst: float):
"""
ฮฑ(t) = rate * t + burst
rate: long-term arrival rate
burst: maximum burst size
"""
self.rate = rate
self.burst = burst
def bound(self, interval: float) -> float:
"""Maximum arrivals in time interval t"""
return self.rate * interval + self.burst
def is_conformant(self, arrivals: List[Tuple[float, float]]) -> bool:
"""
Check if sequence of (time, amount) conforms to curve
For all intervals [s, t], arrivals in interval โค ฮฑ(t-s)
"""
n = len(arrivals)
for i in range(n):
for j in range(i, n):
start_time, _ = arrivals[i]
end_time, _ = arrivals[j]
interval = end_time - start_time
# Sum arrivals in [start_time, end_time]
total = sum(amount for time, amount in arrivals[i:j+1]
if start_time <= time <= end_time)
if total > self.bound(interval):
return False
return True
class ServiceCurve:
"""Service curve ฮฒ(t) bounds service provided"""
def __init__(self, rate: float, latency: float = 0):
"""
ฮฒ(t) = max(0, rate * (t - latency))
rate: service rate
latency: initial latency before service starts
"""
self.rate = rate
self.latency = latency
def bound(self, interval: float) -> float:
"""Minimum service in time interval t"""
return max(0, self.rate * (interval - self.latency))
class NetworkCalculus:
"""Network Calculus operations"""
@staticmethod
def convolution(alpha: ArrivalCurve, beta: ServiceCurve) -> ArrivalCurve:
"""
Output bound: ฮฑ โ ฮฒ
After passing through service curve ฮฒ,
arrivals are bounded by ฮฑ โ ฮฒ
"""
# For affine curves: (r, b) โ (R, T) = (r, b + r * T)
# If r โค R (stable system)
if alpha.rate > beta.rate:
raise ValueError("Unstable: arrival rate > service rate")
output_rate = alpha.rate
output_burst = alpha.burst + alpha.rate * beta.latency
return ArrivalCurve(output_rate, output_burst)
@staticmethod
def concatenation(beta1: ServiceCurve, beta2: ServiceCurve) -> ServiceCurve:
"""
Concatenation of service curves: ฮฒ1 โ ฮฒ2
Service through two nodes in series
"""
# For rate-latency curves: (R1, T1) โ (R2, T2) = (min(R1, R2), T1 + T2)
combined_rate = min(beta1.rate, beta2.rate)
combined_latency = beta1.latency + beta2.latency
return ServiceCurve(combined_rate, combined_latency)
@staticmethod
def backlog_bound(alpha: ArrivalCurve, beta: ServiceCurve) -> float:
"""
Maximum backlog (queue size) bound
sup_t [ฮฑ(t) - ฮฒ(t)]
"""
# For affine arrival and rate-latency service:
# max backlog = b + r * T
return alpha.burst + alpha.rate * beta.latency
@staticmethod
def delay_bound(alpha: ArrivalCurve, beta: ServiceCurve) -> float:
"""
Maximum delay bound
Horizontal deviation between ฮฑ and ฮฒ
"""
if alpha.rate > beta.rate:
return float('inf') # Unstable
# For affine arrival and rate-latency service:
# max delay = T + b / R
return beta.latency + alpha.burst / beta.rate
@staticmethod
def leftover_service(alpha: ArrivalCurve,
beta: ServiceCurve,
priority: str = 'strict') -> ServiceCurve:
"""
Service curve left for other traffic
After serving traffic with arrival curve ฮฑ
"""
if priority == 'strict':
# Strict priority: ฮฒ'(t) = max(0, ฮฒ(t) - ฮฑ(t))
# For affine curves, approximate
leftover_rate = max(0, beta.rate - alpha.rate)
# Latency increases
extra_latency = alpha.burst / beta.rate if beta.rate > 0 else float('inf')
leftover_latency = beta.latency + extra_latency
return ServiceCurve(leftover_rate, leftover_latency)
else:
# Other scheduling policies more complex
raise NotImplementedError(f"Priority {priority} not implemented")
class DeterministicNetworkAnalysis:
"""Analyze network with deterministic guarantees"""
def __init__(self):
self.flows = {} # flow_id -> ArrivalCurve
self.nodes = {} # node_id -> ServiceCurve
self.routing = {} # flow_id -> List[node_id]
def add_flow(self, flow_id: str, arrival_curve: ArrivalCurve):
self.flows[flow_id] = arrival_curve
def add_node(self, node_id: str, service_curve: ServiceCurve):
self.nodes[node_id] = service_curve
def set_routing(self, flow_id: str, path: List[str]):
self.routing[flow_id] = path
def analyze_flow(self, flow_id: str) -> Dict:
"""Analyze end-to-end guarantees for flow"""
if flow_id not in self.flows:
raise ValueError(f"Flow {flow_id} not found")
if flow_id not in self.routing:
raise ValueError(f"Routing for flow {flow_id} not found")
arrival = self.flows[flow_id]
path = self.routing[flow_id]
# Combine service curves along path
if not path:
raise ValueError(f"Empty path for flow {flow_id}")
# Start with first node
combined_service = self.nodes[path[0]]
# Concatenate remaining nodes
for node_id in path[1:]:
node_service = self.nodes[node_id]
combined_service = NetworkCalculus.concatenation(
combined_service, node_service
)
# Calculate bounds
max_backlog = NetworkCalculus.backlog_bound(arrival, combined_service)
max_delay = NetworkCalculus.delay_bound(arrival, combined_service)
# Output arrival curve
output_arrival = NetworkCalculus.convolution(arrival, combined_service)
return {
'flow_id': flow_id,
'path': path,
'arrival_curve': (arrival.rate, arrival.burst),
'service_curve': (combined_service.rate, combined_service.latency),
'max_backlog': max_backlog,
'max_delay': max_delay,
'output_curve': (output_arrival.rate, output_arrival.burst),
'stable': arrival.rate <= combined_service.rate
}
def analyze_node(self, node_id: str) -> Dict:
"""Analyze node utilization and leftover service"""
if node_id not in self.nodes:
raise ValueError(f"Node {node_id} not found")
node_service = self.nodes[node_id]
# Find all flows through this node
flows_through = []
total_arrival = ArrivalCurve(0, 0)
for flow_id, path in self.routing.items():
if node_id in path:
flows_through.append(flow_id)
flow_arrival = self.flows[flow_id]
# Sum arrival curves (assuming independence)
total_arrival.rate += flow_arrival.rate
total_arrival.burst += flow_arrival.burst
# Calculate utilization
utilization = total_arrival.rate / node_service.rate if node_service.rate > 0 else float('inf')
# Leftover service
leftover = NetworkCalculus.leftover_service(total_arrival, node_service)
return {
'node_id': node_id,
'service_curve': (node_service.rate, node_service.latency),
'flows_through': flows_through,
'total_arrival': (total_arrival.rate, total_arrival.burst),
'utilization': utilization,
'leftover_service': (leftover.rate, leftover.latency),
'stable': total_arrival.rate <= node_service.rate
}
class TSN_Analysis:
"""Time-Sensitive Networking analysis using Network Calculus"""
@staticmethod
def time_aware_shaping(cycle_time: float,
slot_duration: float,
frame_sizes: List[float],
link_speed: float) -> Dict:
"""
Analyze Time-Aware Shaping (802.1Qbv)
Cycle repeats every cycle_time
Slots within cycle for different traffic classes
"""
n_slots = int(cycle_time / slot_duration)
# Service curve for a traffic class with dedicated slot
def service_curve_for_slot(slot_index: int, frame_size: float) -> ServiceCurve:
"""
Service during specific slot each cycle
Simplified: Can send one frame per cycle
More accurate: Account for transmission time
"""
# Minimum service: one frame per cycle
min_rate = frame_size / cycle_time
# Maximum latency: wait entire cycle
max_latency = cycle_time
return ServiceCurve(min_rate, max_latency)
results = []
for i, frame_size in enumerate(frame_sizes):
slot = i % n_slots
service = service_curve_for_slot(slot, frame_size)
# Maximum delay for flow using this slot
# Assuming worst-case arrival right after slot
max_delay = cycle_time - slot_duration + frame_size / link_speed
results.append({
'slot': slot,
'frame_size': frame_size,
'guaranteed_rate': service.rate,
'max_latency': service.latency,
'max_delay_with_transmission': max_delay,
'cycle_time': cycle_time,
'slot_duration': slot_duration
})
return results
@staticmethod
def credit_based_shaper(critical_bandwidth: float,
idle_slope: float,
send_slope: float,
max_frame_size: float) -> Dict:
"""
Analyze Credit-Based Shaper (802.1Qav)
Used for Audio Video Bridging (AVB)
"""
# Credit increases at idle_slope when not sending
# Decreases at send_slope when sending
# Bounded by [0, maxCredit]
max_credit = max_frame_size * (idle_slope / send_slope)
# Service curve approximation
# Can send when credit >= frame_size
# Credit builds at rate idle_slope
# Minimum service rate
min_rate = idle_slope
# Maximum burst: can send max_credit worth of data at once
max_burst = max_credit
# Latency: time to build enough credit for first frame
latency = max_frame_size / idle_slope
service = ServiceCurve(min_rate, latency)
return {
'idle_slope': idle_slope,
'send_slope': send_slope,
'max_credit': max_credit,
'max_frame_size': max_frame_size,
'service_curve_rate': service.rate,
'service_curve_latency': service.latency,
'max_burst': max_burst,
'notes': 'Simplified analysis, actual CBS more complex'
}
# Practical application: Industrial Ethernet
class IndustrialNetworkDesign:
"""Design industrial networks with deterministic guarantees"""
def __init__(self):
self.machines = {}
self.switches = {}
self.flows = {}
def add_machine(self, machine_id: str,
cycle_time: float,
data_per_cycle: float):
"""Add a machine with periodic traffic"""
self.machines[machine_id] = {
'cycle_time': cycle_time,
'data_per_cycle': data_per_cycle,
'arrival_curve': ArrivalCurve(
rate=data_per_cycle / cycle_time,
burst=data_per_cycle # One cycle's worth
)
}
def add_switch(self, switch_id: str, speed: float):
"""Add a switch with given speed"""
# Simple service curve: rate = speed, latency = transmission delay
# For accurate analysis, include processing delay
processing_delay = 0.000010 # 10 microseconds
self.switches[switch_id] = ServiceCurve(speed, processing_delay)
def add_flow(self, flow_id: str,
source: str,
destination: str,
path: List[str],
priority: int = 0):
"""Add a data flow"""
if source not in self.machines:
raise ValueError(f"Source machine {source} not found")
arrival = self.machines[source]['arrival_curve']
self.flows[flow_id] = {
'source': source,
'destination': destination,
'path': path,
'priority': priority,
'arrival': arrival
}
def verify_schedulability(self) -> Dict:
"""Verify all flows meet deadlines"""
results = {
'feasible': True,
'violations': [],
'flow_analysis': []
}
for flow_id, flow in self.flows.items():
arrival = flow['arrival']
path = flow['path']
# Combine service along path
if not path:
continue
# Check each switch in path exists
for switch_id in path:
if switch_id not in self.switches:
results['feasible'] = False
results['violations'].append(
f"Flow {flow_id}: Switch {switch_id} not found"
)
break
if not results['feasible']:
continue
# Combine service curves
combined_service = self.switches[path[0]]
for switch_id in path[1:]:
combined_service = NetworkCalculus.concatenation(
combined_service, self.switches[switch_id]
)
# Calculate delay
delay = NetworkCalculus.delay_bound(arrival, combined_service)
# For industrial systems, deadline is typically cycle time
source_machine = self.machines[flow['source']]
deadline = source_machine['cycle_time']
meets_deadline = delay <= deadline
if not meets_deadline:
results['feasible'] = False
results['violations'].append(
f"Flow {flow_id}: Delay {delay:.6f}s > Deadline {deadline:.6f}s"
)
results['flow_analysis'].append({
'flow_id': flow_id,
'path': path,
'arrival_rate': arrival.rate,
'service_rate': combined_service.rate,
'calculated_delay': delay,
'deadline': deadline,
'meets_deadline': meets_deadline,
'utilization': arrival.rate / combined_service.rate
})
return results
def calculate_network_utilization(self) -> Dict:
"""Calculate utilization of each network element"""
utilizations = {}
for switch_id, service in self.switches.items():
total_rate = 0
# Sum rates of all flows through this switch
for flow_id, flow in self.flows.items():
if switch_id in flow['path']:
total_rate += flow['arrival'].rate
utilization = total_rate / service.rate
utilizations[switch_id] = {
'total_rate': total_rate,
'service_rate': service.rate,
'utilization': utilization,
'stable': total_rate <= service.rate
}
return utilizations
Network Calculus Key Results:
- Arrival Curve ฮฑ(t): Bounds traffic arrivals
- Service Curve ฮฒ(t): Bounds service provided
- Backlog Bound: sup[ฮฑ(t) - ฮฒ(t)]
- Delay Bound: Horizontal deviation between ฮฑ and ฮฒ
- Concatenation: ฮฒ1 โ ฮฒ2 for series nodes
- Leftover Service: ฮฒ' = ฮฒ - ฮฑ for remaining capacity
Applications:
- TSN/AVB: Time-sensitive networking standards
- Industrial Ethernet: Factory automation
- 5G URLLC: Ultra-reliable low-latency communication
- Real-time systems: Hard deadline guarantees
๐ง 18. Fluid Flow Models: Continuous Traffic Approximation
Modeling traffic as continuous flow rather than discrete packets.
import numpy as np
from scipy import integrate, optimize
from typing import Callable, List, Tuple
import math
class FluidFlowModel:
"""Continuous fluid approximation of network traffic"""
def __init__(self, initial_state: float = 0):
self.state = initial_state # Queue length or buffer occupancy
self.time = 0
def ode_system(self, t: float, y: float,
arrival_rate: Callable[[float], float],
service_rate: Callable[[float], float]) -> float:
"""
Ordinary Differential Equation for queue dynamics
dy/dt = ฮป(t) - ฮผ(t) for y > 0
dy/dt = max(0, ฮป(t) - ฮผ(t)) for y = 0
"""
lambda_t = arrival_rate(t)
mu_t = service_rate(t)
if y > 0:
return lambda_t - mu_t
else:
return max(0, lambda_t - mu_t)
def simulate(self,
arrival_rate: Callable[[float], float],
service_rate: Callable[[float], float],
duration: float,
dt: float = 0.01) -> Tuple[np.ndarray, np.ndarray]:
"""
Simulate fluid queue using Euler method
Returns: (time_points, queue_lengths)
"""
n_steps = int(duration / dt)
times = np.linspace(0, duration, n_steps)
queues = np.zeros(n_steps)
current_queue = self.state
for i, t in enumerate(times):
# Update queue
if current_queue > 0:
dq_dt = arrival_rate(t) - service_rate(t)
else:
dq_dt = max(0, arrival_rate(t) - service_rate(t))
current_queue += dq_dt * dt
current_queue = max(0, current_queue) # Non-negative
queues[i] = current_queue
return times, queues
def steady_state_analysis(self,
arrival_rate: float,
service_rate: float) -> Dict:
"""
Steady-state analysis of fluid queue
For constant rates ฮป and ฮผ
"""
if arrival_rate >= service_rate:
# Unstable, queue grows without bound
return {
'stable': False,
'utilization': arrival_rate / service_rate,
'mean_queue': float('inf'),
'mean_delay': float('inf')
}
# M/M/1 fluid approximation
rho = arrival_rate / service_rate
# For fluid model with exponential assumptions
mean_queue = rho / (1 - rho)
mean_delay = mean_queue / arrival_rate
# Distribution (approximate)
def queue_distribution(q: float) -> float:
# P(queue > q) = e^{-(ฮผ-ฮป)q}
return math.exp(-(service_rate - arrival_rate) * q)
return {
'stable': True,
'arrival_rate': arrival_rate,
'service_rate': service_rate,
'utilization': rho,
'mean_queue': mean_queue,
'mean_delay': mean_delay,
'p_queue_exceeds': queue_distribution,
'p_queue_exceeds_1': queue_distribution(1),
'p_queue_exceeds_10': queue_distribution(10)
}
class NetworkFluidModel:
"""Fluid model of network with multiple queues"""
def __init__(self, n_queues: int):
self.n_queues = n_queues
self.queues = [0] * n_queues
self.routing = np.eye(n_queues) # Default: no routing between queues
def set_routing(self, routing_matrix: np.ndarray):
"""
routing_matrix[i][j] = fraction from queue i to queue j
routing_matrix[i][0] = fraction leaving system from i
"""
if routing_matrix.shape != (self.n_queues, self.n_queues):
raise ValueError("Routing matrix must be n x n")
# Rows should sum to 1 (conservation of flow)
for i in range(self.n_queues):
if not math.isclose(np.sum(routing_matrix[i]), 1, rel_tol=1e-9):
raise ValueError(f"Row {i} does not sum to 1")
self.routing = routing_matrix
def network_ode(self, t: float, y: np.ndarray,
external_arrivals: np.ndarray,
service_rates: np.ndarray) -> np.ndarray:
"""
ODE system for network of fluid queues
dy_i/dt = external_i + ฮฃ_j y_j * ฮผ_j * r_ji - ฮผ_i * y_i
where y_i = queue length at node i
"""
dydt = np.zeros(self.n_queues)
# Current service rates (could depend on queue lengths)
current_service = np.minimum(service_rates, y) # Can't serve more than queue
# Flow balance
for i in range(self.n_queues):
# External arrivals
inflow = external_arrivals[i]
# Internal arrivals from other queues
for j in range(self.n_queues):
if i != j:
inflow += current_service[j] * self.routing[j][i]
# Outflow (service)
outflow = current_service[i]
dydt[i] = inflow - outflow
return dydt
def simulate_network(self,
external_arrivals: Callable[[float], np.ndarray],
service_rates: Callable[[float], np.ndarray],
duration: float,
dt: float = 0.01) -> Tuple[np.ndarray, List[np.ndarray]]:
"""Simulate network of fluid queues"""
n_steps = int(duration / dt)
times = np.linspace(0, duration, n_steps)
queue_histories = [np.zeros(n_steps) for _ in range(self.n_queues)]
current_queues = np.array(self.queues, dtype=float)
for step, t in enumerate(times):
# Get current rates
ext_arr = external_arrivals(t)
svc_rates = service_rates(t)
# Update using Euler method
dq_dt = self.network_ode(t, current_queues, ext_arr, svc_rates)
current_queues += dq_dt * dt
current_queues = np.maximum(current_queues, 0) # Non-negative
# Store
for i in range(self.n_queues):
queue_histories[i][step] = current_queues[i]
return times, queue_histories
def steady_state_network(self,
external_arrivals: np.ndarray,
service_rates: np.ndarray) -> Dict:
"""
Calculate steady-state of network
Solve: 0 = ฮป + R^T * ฮผ - ฮผ
where ฮป = external arrivals, R = routing matrix
"""
# Solve linear equations for effective arrival rates
# ฮ = ฮป + R^T ฮ
# (I - R^T) ฮ = ฮป
I = np.eye(self.n_queues)
A = I - self.routing.T
try:
effective_arrivals = np.linalg.solve(A, external_arrivals)
except np.linalg.LinAlgError:
# Singular matrix, use least squares
effective_arrivals = np.linalg.lstsq(A, external_arrivals, rcond=None)[0]
# Check stability: effective_arrivals_i < service_rates_i
stable = all(effective_arrivals[i] < service_rates[i]
for i in range(self.n_queues))
if not stable:
return {
'stable': False,
'effective_arrivals': effective_arrivals,
'service_rates': service_rates,
'unstable_nodes': [i for i in range(self.n_queues)
if effective_arrivals[i] >= service_rates[i]]
}
# Calculate queue lengths (M/M/1 approximation)
utilizations = effective_arrivals / service_rates
mean_queues = utilizations / (1 - utilizations)
mean_delays = mean_queues / effective_arrivals
# Total network
total_arrival = np.sum(external_arrivals)
total_queue = np.sum(mean_queues)
total_delay = total_queue / total_arrival if total_arrival > 0 else 0
return {
'stable': True,
'effective_arrivals': effective_arrivals.tolist(),
'utilizations': utilizations.tolist(),
'mean_queues': mean_queues.tolist(),
'mean_delays': mean_delays.tolist(),
'total_arrival': total_arrival,
'total_queue': total_queue,
'total_delay': total_delay,
'bottleneck_node': np.argmax(utilizations)
}
class TCPFluidModel:
"""Fluid model of TCP congestion control"""
def __init__(self,
num_flows: int,
link_capacity: float,
propagation_delay: float = 0.1):
self.num_flows = num_flows
self.link_capacity = link_capacity
self.prop_delay = propagation_delay
# State variables
self.window_sizes = [1.0] * num_flows # Congestion window
self.queue_length = 0.0
self.rtts = [2 * propagation_delay] * num_flows
def tcp_ode(self, t: float,
w: List[float], # window sizes
q: float) -> Tuple[List[float], float]:
"""
ODE model of TCP Reno/RED
Based on: "A Fluid-based Analysis of a Network of AQM Routers
Supporting TCP Flows" by Misra, Gong, Towsley
"""
# Calculate total arrival rate
total_rate = sum(wi / rtti for wi, rtti in zip(w, self.rtts))
# Queue dynamics
dq_dt = total_rate - self.link_capacity
dq_dt = max(dq_dt, -self.link_capacity) # Can't deplete faster than capacity
# TCP window dynamics
dw_dt = []
for i in range(self.num_flows):
# Current RTT = propagation + queueing delay
current_rtt = self.prop_delay + q / self.link_capacity
self.rtts[i] = current_rtt
# TCP Reno: increase by 1/RTT per RTT, decrease by w/2 on loss
# Loss probability based on queue length (simplified RED)
p_loss = self._red_loss_probability(q)
# Window update
dwdt = 1/current_rtt - w[i] * p_loss / (2 * current_rtt)
dw_dt.append(dwdt)
return dw_dt, dq_dt
def _red_loss_probability(self, q: float) -> float:
"""Random Early Detection (RED) loss probability"""
min_thresh = 5.0
max_thresh = 15.0
max_p = 0.1
if q <= min_thresh:
return 0.0
elif q >= max_thresh:
return max_p
else:
return max_p * (q - min_thresh) / (max_thresh - min_thresh)
def simulate_tcp(self, duration: float, dt: float = 0.01) -> Dict:
"""Simulate TCP dynamics"""
n_steps = int(duration / dt)
times = np.linspace(0, duration, n_steps)
# History
window_history = [np.zeros(n_steps) for _ in range(self.num_flows)]
queue_history = np.zeros(n_steps)
rate_history = [np.zeros(n_steps) for _ in range(self.num_flows)]
# Initial state
w = self.window_sizes.copy()
q = self.queue_length
for step, t in enumerate(times):
# Store current state
for i in range(self.num_flows):
window_history[i][step] = w[i]
rate_history[i][step] = w[i] / self.rtts[i] if self.rtts[i] > 0 else 0
queue_history[step] = q
# Update using Euler method
dw_dt, dq_dt = self.tcp_ode(t, w, q)
for i in range(self.num_flows):
w[i] += dw_dt[i] * dt
w[i] = max(w[i], 1.0) # Minimum window
q += dq_dt * dt
q = max(q, 0.0) # Non-negative queue
# Statistics
final_rates = [w[i] / self.rtts[i] for i in range(self.num_flows)]
total_rate = sum(final_rates)
utilization = total_rate / self.link_capacity
avg_queue = np.mean(queue_history)
avg_rtt = np.mean(self.rtts)
# Fairness index (Jain's index)
rates = final_rates
n = len(rates)
numerator = sum(rates) ** 2
denominator = n * sum(r**2 for r in rates)
fairness = numerator / denominator if denominator > 0 else 0
return {
'times': times,
'windows': window_history,
'queue': queue_history,
'rates': rate_history,
'final_rates': final_rates,
'total_rate': total_rate,
'utilization': utilization,
'average_queue': avg_queue,
'average_rtt': avg_rtt,
'fairness_index': fairness,
'converged': abs(utilization - 1.0) < 0.1 # Close to full utilization
}
class FluidApproximationAccuracy:
"""Analyze accuracy of fluid approximations"""
@staticmethod
def compare_discrete_fluid(arrival_process: str,
service_process: str,
duration: float,
num_samples: int = 1000) -> Dict:
"""
Compare discrete event simulation vs fluid approximation
"""
# Parameters
mean_arrival = 10.0 # packets/second
mean_service = 12.0 # packets/second
if arrival_process == 'poisson':
# Discrete: Poisson arrivals
interarrival_times = np.random.exponential(1/mean_arrival, num_samples)
arrival_times = np.cumsum(interarrival_times)
arrival_times = arrival_times[arrival_times <= duration]
elif arrival_process == 'constant':
# Constant rate
interval = 1 / mean_arrival
arrival_times = np.arange(0, duration, interval)
else:
raise ValueError(f"Unknown arrival process: {arrival_process}")
if service_process == 'exponential':
service_times = np.random.exponential(1/mean_service, len(arrival_times))
elif service_process == 'constant':
service_times = np.full(len(arrival_times), 1/mean_service)
else:
raise ValueError(f"Unknown service process: {service_process}")
# Discrete event simulation
departure_times = []
current_time = 0
for i, arrival in enumerate(arrival_times):
start_service = max(arrival, current_time)
departure = start_service + service_times[i]
departure_times.append(departure)
current_time = departure
# Calculate queue length over time (discrete)
time_points = np.linspace(0, duration, 1000)
discrete_queue = np.zeros_like(time_points)
for i, t in enumerate(time_points):
arrivals_by_t = np.sum(arrival_times <= t)
departures_by_t = np.sum(np.array(departure_times) <= t)
discrete_queue[i] = arrivals_by_t - departures_by_t
# Fluid approximation
fluid_model = FluidFlowModel()
times, fluid_queue = fluid_model.simulate(
arrival_rate=lambda t: mean_arrival,
service_rate=lambda t: mean_service,
duration=duration,
dt=0.01
)
# Interpolate fluid to same time points
from scipy import interpolate
if len(times) > 1 and len(fluid_queue) > 1:
fluid_interp = interpolate.interp1d(times, fluid_queue,
bounds_error=False,
fill_value=0)
fluid_at_points = fluid_interp(time_points)
else:
fluid_at_points = np.zeros_like(time_points)
# Calculate errors
mae = np.mean(np.abs(discrete_queue - fluid_at_points))
mse = np.mean((discrete_queue - fluid_at_points) ** 2)
max_error = np.max(np.abs(discrete_queue - fluid_at_points))
# Steady-state comparison
discrete_mean = np.mean(discrete_queue)
fluid_mean = np.mean(fluid_queue)
# Theoretical M/M/1
rho = mean_arrival / mean_service
theoretical_mean = rho / (1 - rho) if rho < 1 else float('inf')
return {
'arrival_process': arrival_process,
'service_process': service_process,
'utilization': rho,
'discrete_mean_queue': discrete_mean,
'fluid_mean_queue': fluid_mean,
'theoretical_mean_queue': theoretical_mean,
'mean_absolute_error': mae,
'mean_squared_error': mse,
'max_error': max_error,
'relative_error_mean': abs(fluid_mean - discrete_mean) / discrete_mean if discrete_mean > 0 else 0,
'fluid_accuracy': 'good' if mae < 1.0 else 'fair' if mae < 3.0 else 'poor',
'time_points': time_points,
'discrete_queue': discrete_queue,
'fluid_queue': fluid_at_points
}
@staticmethod
def scaling_accuracy(num_flows: int,
duration: float = 100) -> List[Dict]:
"""How accuracy improves with scaling (more flows)"""
results = []
for n in [1, 2, 5, 10, 20, 50, 100]:
# Simulate n independent M/M/1 queues
total_discrete = 0
total_fluid = 0
for _ in range(n):
comparison = FluidApproximationAccuracy.compare_discrete_fluid(
'poisson', 'exponential', duration, num_samples=1000
)
total_discrete += comparison['discrete_mean_queue']
total_fluid += comparison['fluid_mean_queue']
avg_discrete = total_discrete / n
avg_fluid = total_fluid / n
error = abs(avg_fluid - avg_discrete) / avg_discrete if avg_discrete > 0 else 0
results.append({
'num_flows': n,
'avg_discrete': avg_discrete,
'avg_fluid': avg_fluid,
'relative_error': error,
'fluid_law_of_large_numbers': error < 1/n # Should improve with n
})
return results
Fluid Flow Model Insights:
- Continuous Approximation: Treat traffic as fluid flow rather than discrete packets
- ODE Models: Describe system dynamics with differential equations
- Scalability: More accurate with many flows (law of large numbers)
- Analytical Tractability: Easier to analyze than discrete event simulation
- TCP Modeling: Can model congestion control dynamics
Applications:
- Large-scale networks: Data centers, backbone networks
- Congestion control analysis: TCP variants, AQM schemes
- Performance bounds: Worst-case analysis
- Control theory: Design of network controllers
๐ฏ Conclusion: From Theory to Practice
We've journeyed through the deep theoretical foundations of distributed systems. Let's summarize the key takeaways:
class DistributedTheoryCheatSheet:
"""Quick reference for distributed systems theory"""
@staticmethod
def when_to_use_what():
return {
'clock_sync': {
'lamport': 'Causal ordering without physical clocks',
'vector': 'Detect concurrency, more expensive',
'hybrid': 'Mix physical and logical, good balance',
'truetime': 'Global consistency (Spanner, requires HW)',
'itc': 'Dynamic systems, fork/join operations'
},
'consensus': {
'paxos': 'Classic, proven, complex',
'raft': 'Understandable, production-ready',
'byzantine': 'Adversarial environments',
'flp': 'Remember: async + deterministic = impossible'
},
'consistency': {
'linearizable': 'Strongest, banking, locks',
'sequential': 'Weaker but faster',
'causal': 'Preserves happens-before',
'eventual': 'Weak, high availability',
'red_blue': 'Mix strong and eventual'
},
'load_balancing': {
'power_of_d': 'd=2 gives most benefit',
'consistent_hashing': 'Minimal movement',
'rendezvous': 'Perfect balance, O(n) cost',
'jump_hash': 'Perfect balance, O(log n), static'
},
'performance_modeling': {
'queueing_theory': 'Predict means, steady-state',
'network_calculus': 'Worst-case bounds',
'fluid_models': 'Continuous approximation',
'tail_analysis': 'Focus on p99, p999'
},
'concurrency': {
'lock_based': 'Simple, can deadlock',
'lock_free': 'Progress guaranteed, individual starvation',
'wait_free': 'Bounded steps, most expensive',
'obstruction_free': 'Weakest, progress with no contention'
}
}
@staticmethod
def practical_advice():
return [
"1. Start with Raft, not Paxos (unless you need Byzantine)",
"2. Use Hybrid Logical Clocks for most applications",
"3. Power of 2 choices for load balancing",
"4. Measure p99, p999, not just averages",
"5. Use queueing theory for capacity planning",
"6. Consider consistency trade-offs (CAP, PACELC)",
"7. Implement hedging for tail latency reduction",
"8. Use network calculus for hard real-time systems",
"9. Fluid models good for large-scale analysis",
"10. Theory informs, but measure your actual system"
]
@staticmethod
def next_steps():
return {
'read': [
"Leslie Lamport's papers",
"Google's Spanner, Chubby papers",
"Amazon's Dynamo paper",
"Twitter's Manhattan paper"
],
'implement': [
"A simple Raft implementation",
"Vector clocks for causal consistency",
"Power of d choices load balancer",
"Queueing theory simulator"
],
'experiment': [
"Measure tail latency in your system",
"Try different consistency levels",
"Simulate network partitions",
"Benchmark lock-free vs lock-based"
]
}
The Big Picture:
Distributed systems theory isn't just academicโit's the foundation of every cloud service, database, and large-scale application. Understanding these concepts helps you:
- Make informed design decisions based on trade-offs
- Debug complex issues by understanding root causes
- Plan for scale using mathematical models
- Innovate new solutions by combining ideas
Remember These Principles:
- There are no perfect solutions, only trade-offs (CAP, FLP, etc.)
- Time and order are fundamental challenges
- Scalability requires new ways of thinking
- Theory guides, but practice decides
๐ Your Distributed Systems Journey Continues
You now have a comprehensive understanding of distributed systems theory. But this is just the beginning. The real learning happens when you:
- Build systems using these principles
- Measure everything and compare to theory
- Contribute back to the community
Share your experiences! What theoretical concepts have you applied in practice? Which ones surprised you with their practical relevance? Let's continue the conversation and build better distributed systems together!
Top comments (0)