Write AI Policies That Actually Work: Custom Rule Examples
Most AI governance policies are as useful as a chocolate teapot — they look impressive in meetings but melt the moment they touch production heat.
The Problem: Your AI Policies Are Theater, Not Engineering
Here's what I see in every "AI governance" document that crosses my desk:
- "AI systems must be fair and unbiased" (What does that mean? Fair to whom? Measured how?)
- "Models must be regularly monitored" (How often is regular? What metrics matter?)
- "Data privacy must be maintained" (Which data? What constitutes a violation?)
These aren't policies. They're wishes written in corporate speak.
Meanwhile, your AI agents are burning through API quotas, hallucinating customer data, and making decisions that would make a compliance officer weep. You need rules that actually fire when something goes wrong, not mission statements that make executives feel better about their AI initiatives.
The gap between "we have AI governance" and "our AI governance actually works" is filled with custom rules that trigger on specific, measurable violations. Not vibes. Not best practices. Executable code that says "this specific thing happened, therefore that specific action must be taken."
Architecture: How Policy Rules Actually Work
Here's how Airblackbox turns your governance requirements into executable policies:
graph TD
A[AI Agent Request] --> B[Gateway Proxy]
B --> C[LLM Provider]
C --> D[Response Capture]
D --> E[Rule Engine]
E --> F{Policy Check}
F -->|Pass| G[Log & Forward]
F -->|Fail| H[Block & Alert]
F -->|Warn| I[Flag & Forward]
J[Custom Rules] --> E
K[EU AI Act Rules] --> E
L[Rate Limits] --> E
H --> M[Compliance Dashboard]
I --> M
G --> N[Observability Store]
style F fill:#ff6b6b
style J fill:#4ecdc4
style M fill:#45b7d1
The Gateway sits between your agent and the LLM, captures everything, runs your custom rules against the traffic, and either blocks, warns, or passes through based on what it finds.
Think of it as a firewall, but instead of blocking ports, it blocks prompts that violate your policies.
Implementation: Building Custom Policy Rules
Let's build three real rules that solve actual problems developers face.
Rule 1: Rate Limiting by User Role
Problem: Your intern shouldn't be able to burn through the entire monthly GPT-4 budget in one afternoon experimenting with creative writing prompts.
# custom_rules/rate_limiting.py
from airblackbox.rules import Rule, RuleResult
from datetime import datetime, timedelta
import redis
class UserRoleLimiter(Rule):
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
self.limits = {
'intern': {'requests_per_hour': 10, 'tokens_per_day': 1000},
'developer': {'requests_per_hour': 50, 'tokens_per_day': 10000},
'admin': {'requests_per_hour': 200, 'tokens_per_day': 50000}
}
def evaluate(self, request, response, metadata):
user_id = metadata.get('user_id')
user_role = metadata.get('user_role', 'intern') # Default to most restrictive
if user_role not in self.limits:
return RuleResult(
passed=False,
message=f"Unknown user role: {user_role}",
action="block"
)
# Check hourly request limit
hour_key = f"requests:{user_id}:{datetime.now().strftime('%Y%m%d%H')}"
hourly_requests = self.redis.incr(hour_key)
self.redis.expire(hour_key, 3600) # Expire after 1 hour
if hourly_requests > self.limits[user_role]['requests_per_hour']:
return RuleResult(
passed=False,
message=f"User {user_id} exceeded hourly limit ({self.limits[user_role]['requests_per_hour']})",
action="block",
metadata={'current_requests': hourly_requests}
)
# Check daily token limit
day_key = f"tokens:{user_id}:{datetime.now().strftime('%Y%m%d')}"
current_tokens = int(self.redis.get(day_key) or 0)
estimated_tokens = len(request.get('prompt', '')) // 4 # Rough estimation
if current_tokens + estimated_tokens > self.limits[user_role]['tokens_per_day']:
return RuleResult(
passed=False,
message=f"User {user_id} would exceed daily token limit",
action="block",
metadata={'current_tokens': current_tokens, 'estimated_tokens': estimated_tokens}
)
# Update token counter after successful validation
self.redis.incrby(day_key, estimated_tokens)
self.redis.expire(day_key, 86400) # Expire after 24 hours
return RuleResult(
passed=True,
message=f"Rate limit check passed for {user_role}",
metadata={'requests_remaining': self.limits[user_role]['requests_per_hour'] - hourly_requests}
)
Rule 2: PII Detection and Redaction
Problem: Your customer service agent just tried to send someone's SSN to OpenAI. This is not ideal for anyone involved.
# custom_rules/pii_protection.py
import re
from airblackbox.rules import Rule, RuleResult
class PIIProtectionRule(Rule):
def __init__(self):
self.patterns = {
'ssn': r'\b\d{3}-?\d{2}-?\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
}
def evaluate(self, request, response, metadata):
prompt = request.get('prompt', '')
violations = []
redacted_prompt = prompt
for pii_type, pattern in self.patterns.items():
matches = re.finditer(pattern, prompt, re.IGNORECASE)
for match in matches:
violations.append({
'type': pii_type,
'value': match.group(),
'position': match.span()
})
# Redact the PII
redacted_prompt = redacted_prompt.replace(
match.group(),
f'[REDACTED_{pii_type.upper()}]'
)
if violations:
# For high-risk PII, block entirely
high_risk = ['ssn', 'credit_card']
if any(v['type'] in high_risk for v in violations):
return RuleResult(
passed=False,
message=f"High-risk PII detected: {[v['type'] for v in violations]}",
action="block",
metadata={'violations': violations}
)
# For lower-risk PII, redact and warn
return RuleResult(
passed=True,
message=f"PII detected and redacted: {[v['type'] for v in violations]}",
action="modify",
metadata={
'violations': violations,
'modified_prompt': redacted_prompt
}
)
return RuleResult(passed=True, message="No PII detected")
Rule 3: Content Appropriateness with Business Context
Problem: Your legal research agent keeps trying to get ChatGPT to write creative fiction about murder trials. Your lawyers are not amused.
# custom_rules/content_appropriateness.py
from airblackbox.rules import Rule, RuleResult
import openai
class ContentAppropriatenessRule(Rule):
def __init__(self, allowed_domains=None):
self.client = openai.OpenAI() # For moderation API
self.allowed_domains = allowed_domains or []
# Business context keywords that make otherwise flagged content acceptable
self.business_contexts = {
'legal_research': ['case law', 'legal precedent', 'court ruling', 'litigation'],
'security_research': ['vulnerability', 'penetration test', 'security audit'],
'medical_research': ['clinical trial', 'medical study', 'patient care'],
'content_moderation': ['content policy', 'moderation guidelines', 'user safety']
}
def evaluate(self, request, response, metadata):
prompt = request.get('prompt', '')
user_domain = metadata.get('user_domain', 'general')
# Run OpenAI's moderation check
try:
moderation_response = self.client.moderations.create(input=prompt)
flagged = moderation_response.results[0].flagged
categories = moderation_response.results[0].categories
except Exception as e:
return RuleResult(
passed=False,
message=f"Moderation check failed: {str(e)}",
action="block"
)
if not flagged:
return RuleResult(passed=True, message="Content passed moderation")
# Content is flagged, check for business context exceptions
flagged_categories = [cat for cat, flagged in categories.__dict__.items() if flagged]
# Check if user domain allows this type of content
if user_domain in self.business_contexts:
context_keywords = self.business_contexts[user_domain]
if any(keyword.lower() in prompt.lower() for keyword in context_keywords):
return RuleResult(
passed=True,
message=f"Content flagged but allowed for {user_domain} context",
action="warn",
metadata={
'flagged_categories': flagged_categories,
'business_context': user_domain,
'justification': 'Business context exception applied'
}
)
# No business context exception, block the request
return RuleResult(
passed=False,
message=f"Content flagged for: {', '.join(flagged_categories)}",
action="block",
metadata={'flagged_categories': flagged_categories}
)
Wiring It All Together
Now let's create a policy engine that runs all these rules:
# policy_engine.py
from airblackbox import Gateway
from custom_rules.rate_limiting import UserRoleLimiter
from custom_rules.pii_protection import PIIProtectionRule
from custom_rules.content_appropriateness import ContentAppropriatenessRule
# Initialize the gateway with custom rules
gateway = Gateway()
# Add your custom rules
gateway.add_rule(UserRoleLimiter())
gateway.add_rule(PIIProtectionRule())
gateway.add_rule(ContentAppropriatenessRule(
allowed_domains=['legal_research', 'security_research']
))
# Start the gateway
if __name__ == "__main__":
gateway.start(host="0.0.0.0", port=8080)
Pitfalls: What Will Break and How to Fix It
1. Rule Ordering Matters
Problem: Your PII redaction rule runs after your rate limiting rule, so you're counting tokens for text that gets modified.
Solution: Rules run in the order you add them. Put modification rules (like PII redaction) first, then validation rules (like rate limits).
# Wrong order
gateway.add_rule(UserRoleLimiter()) # Counts original tokens
gateway.add_rule(PIIProtectionRule()) # Then redacts
# Right order
gateway.add_rule(PIIProtectionRule()) # Redacts first
gateway.add_rule(UserRoleLimiter()) # Counts redacted tokens
2. Performance Death by A Thousand Cuts
Problem: You added 20 rules that each make external API calls. Your response time went from 200ms to 5 seconds.
Solution: Cache expensive operations and use async where possible:
from functools import lru_cache
import asyncio
class OptimizedPIIRule(Rule):
@lru_cache(maxsize=1000)
def _check_patterns(self, text_hash, text):
# Expensive regex operations cached by hash
return self._find_pii_violations(text)
async def evaluate_async(self, request, response, metadata):
# Run expensive operations in parallel
text_hash = hash(request.get('prompt', ''))
violations = await asyncio.to_thread(
self._check_patterns, text_hash, request.get('prompt', '')
)
return self._build_result(violations)
3. State Management in Distributed Systems
Problem: You're running multiple gateway instances behind a load balancer. Rate limits work inconsistently because each instance has its own Redis connection and state.
Solution: Use Redis Cluster or a shared state backend:
import redis.sentinel
class DistributedUserRoleLimiter(Rule):
def __init__(self):
# Use Redis Sentinel for high availability
sentinel = redis.sentinel.Sentinel([
('localhost', 26379),
('localhost', 26380),
('localhost', 26381)
])
self.redis = sentinel.master_for('mymaster', socket_timeout=0.1)
Measurement: How to Know It's Working
Your rules are only as good as your ability to measure their effectiveness. Here's how to build observability into your policy engine:
# monitoring.py
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class RuleMetrics:
rule_name: str
executions: int
blocks: int
warnings: int
avg_execution_time: float
last_violation: datetime
class PolicyMonitor:
def __init__(self):
self.metrics = {}
def record_execution(self, rule_name, execution_time, result):
if rule_name not in self.metrics:
self.metrics[rule_name] = {
'executions': 0, 'blocks': 0, 'warnings': 0,
'total_time': 0, 'last_violation': None
}
m = self.metrics[rule_name]
m['executions'] += 1
m['total_time'] += execution_time
if result.action == 'block':
m['blocks'] += 1
m['last_violation'] = datetime.now()
elif result.action == 'warn':
m['warnings'] += 1
def get_dashboard_data(self):
dashboard = {}
for rule_name, data in self.metrics.items():
dashboard[rule_name] = RuleMetrics(
rule_name=rule_name,
executions=data['executions'],
blocks=data['blocks'],
warnings=data['warnings'],
avg_execution_time=data['total_time'] / max(data['executions'], 1),
last_violation=data['last_violation']
)
return dashboard
Key metrics to track:
- Rule execution frequency (are your rules actually running?)
- Block/warn rates (too high = rules too strict, too low = rules not catching issues)
- False positive rates (measure via manual review of blocked requests)
- Performance impact (rule execution time vs total request time)
Next Steps
You now have the blueprint for AI policies that actually enforce themselves. But reading about rules and running them in production are different beasts entirely.
Want to see this in action? Clone the Airblackbox policy examples repo and run these rules against real traffic. The repo includes:
- Complete working examples of all three rules above
- Docker Compose setup with Redis and monitoring dashboards
- Test scenarios that trigger each rule type
- Performance benchmarks for rule execution
Or jump straight into the deep end: Install Airblackbox, point it at your existing AI agents, and watch what your policies actually catch. You'll be surprised what your agents are trying to do when you're not looking.
Because the best AI governance policy is the one that runs itself. Everything else is just expensive theater.
Try Airblackbox — Because your AI agents need adult supervision, not mission statements.
Top comments (0)