DEV Community

Jason Shotwell
Jason Shotwell

Posted on

Write AI Policies That Actually Work: Custom Rule Examples

Write AI Policies That Actually Work: Custom Rule Examples

Most AI governance policies are as useful as a chocolate teapot — they look impressive in meetings but melt the moment they touch production heat.

The Problem: Your AI Policies Are Theater, Not Engineering

Here's what I see in every "AI governance" document that crosses my desk:

  • "AI systems must be fair and unbiased" (What does that mean? Fair to whom? Measured how?)
  • "Models must be regularly monitored" (How often is regular? What metrics matter?)
  • "Data privacy must be maintained" (Which data? What constitutes a violation?)

These aren't policies. They're wishes written in corporate speak.

Meanwhile, your AI agents are burning through API quotas, hallucinating customer data, and making decisions that would make a compliance officer weep. You need rules that actually fire when something goes wrong, not mission statements that make executives feel better about their AI initiatives.

The gap between "we have AI governance" and "our AI governance actually works" is filled with custom rules that trigger on specific, measurable violations. Not vibes. Not best practices. Executable code that says "this specific thing happened, therefore that specific action must be taken."

Architecture: How Policy Rules Actually Work

Here's how Airblackbox turns your governance requirements into executable policies:

graph TD
    A[AI Agent Request] --> B[Gateway Proxy]
    B --> C[LLM Provider]
    C --> D[Response Capture]
    D --> E[Rule Engine]
    E --> F{Policy Check}
    F -->|Pass| G[Log & Forward]
    F -->|Fail| H[Block & Alert]
    F -->|Warn| I[Flag & Forward]

    J[Custom Rules] --> E
    K[EU AI Act Rules] --> E
    L[Rate Limits] --> E

    H --> M[Compliance Dashboard]
    I --> M
    G --> N[Observability Store]

    style F fill:#ff6b6b
    style J fill:#4ecdc4
    style M fill:#45b7d1
Enter fullscreen mode Exit fullscreen mode

The Gateway sits between your agent and the LLM, captures everything, runs your custom rules against the traffic, and either blocks, warns, or passes through based on what it finds.

Think of it as a firewall, but instead of blocking ports, it blocks prompts that violate your policies.

Implementation: Building Custom Policy Rules

Let's build three real rules that solve actual problems developers face.

Rule 1: Rate Limiting by User Role

Problem: Your intern shouldn't be able to burn through the entire monthly GPT-4 budget in one afternoon experimenting with creative writing prompts.

# custom_rules/rate_limiting.py
from airblackbox.rules import Rule, RuleResult
from datetime import datetime, timedelta
import redis

class UserRoleLimiter(Rule):
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)
        self.limits = {
            'intern': {'requests_per_hour': 10, 'tokens_per_day': 1000},
            'developer': {'requests_per_hour': 50, 'tokens_per_day': 10000},
            'admin': {'requests_per_hour': 200, 'tokens_per_day': 50000}
        }

    def evaluate(self, request, response, metadata):
        user_id = metadata.get('user_id')
        user_role = metadata.get('user_role', 'intern')  # Default to most restrictive

        if user_role not in self.limits:
            return RuleResult(
                passed=False,
                message=f"Unknown user role: {user_role}",
                action="block"
            )

        # Check hourly request limit
        hour_key = f"requests:{user_id}:{datetime.now().strftime('%Y%m%d%H')}"
        hourly_requests = self.redis.incr(hour_key)
        self.redis.expire(hour_key, 3600)  # Expire after 1 hour

        if hourly_requests > self.limits[user_role]['requests_per_hour']:
            return RuleResult(
                passed=False,
                message=f"User {user_id} exceeded hourly limit ({self.limits[user_role]['requests_per_hour']})",
                action="block",
                metadata={'current_requests': hourly_requests}
            )

        # Check daily token limit
        day_key = f"tokens:{user_id}:{datetime.now().strftime('%Y%m%d')}"
        current_tokens = int(self.redis.get(day_key) or 0)
        estimated_tokens = len(request.get('prompt', '')) // 4  # Rough estimation

        if current_tokens + estimated_tokens > self.limits[user_role]['tokens_per_day']:
            return RuleResult(
                passed=False,
                message=f"User {user_id} would exceed daily token limit",
                action="block",
                metadata={'current_tokens': current_tokens, 'estimated_tokens': estimated_tokens}
            )

        # Update token counter after successful validation
        self.redis.incrby(day_key, estimated_tokens)
        self.redis.expire(day_key, 86400)  # Expire after 24 hours

        return RuleResult(
            passed=True,
            message=f"Rate limit check passed for {user_role}",
            metadata={'requests_remaining': self.limits[user_role]['requests_per_hour'] - hourly_requests}
        )
Enter fullscreen mode Exit fullscreen mode

Rule 2: PII Detection and Redaction

Problem: Your customer service agent just tried to send someone's SSN to OpenAI. This is not ideal for anyone involved.

# custom_rules/pii_protection.py
import re
from airblackbox.rules import Rule, RuleResult

class PIIProtectionRule(Rule):
    def __init__(self):
        self.patterns = {
            'ssn': r'\b\d{3}-?\d{2}-?\d{4}\b',
            'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
        }

    def evaluate(self, request, response, metadata):
        prompt = request.get('prompt', '')
        violations = []
        redacted_prompt = prompt

        for pii_type, pattern in self.patterns.items():
            matches = re.finditer(pattern, prompt, re.IGNORECASE)
            for match in matches:
                violations.append({
                    'type': pii_type,
                    'value': match.group(),
                    'position': match.span()
                })
                # Redact the PII
                redacted_prompt = redacted_prompt.replace(
                    match.group(), 
                    f'[REDACTED_{pii_type.upper()}]'
                )

        if violations:
            # For high-risk PII, block entirely
            high_risk = ['ssn', 'credit_card']
            if any(v['type'] in high_risk for v in violations):
                return RuleResult(
                    passed=False,
                    message=f"High-risk PII detected: {[v['type'] for v in violations]}",
                    action="block",
                    metadata={'violations': violations}
                )

            # For lower-risk PII, redact and warn
            return RuleResult(
                passed=True,
                message=f"PII detected and redacted: {[v['type'] for v in violations]}",
                action="modify",
                metadata={
                    'violations': violations,
                    'modified_prompt': redacted_prompt
                }
            )

        return RuleResult(passed=True, message="No PII detected")
Enter fullscreen mode Exit fullscreen mode

Rule 3: Content Appropriateness with Business Context

Problem: Your legal research agent keeps trying to get ChatGPT to write creative fiction about murder trials. Your lawyers are not amused.

# custom_rules/content_appropriateness.py
from airblackbox.rules import Rule, RuleResult
import openai

class ContentAppropriatenessRule(Rule):
    def __init__(self, allowed_domains=None):
        self.client = openai.OpenAI()  # For moderation API
        self.allowed_domains = allowed_domains or []

        # Business context keywords that make otherwise flagged content acceptable
        self.business_contexts = {
            'legal_research': ['case law', 'legal precedent', 'court ruling', 'litigation'],
            'security_research': ['vulnerability', 'penetration test', 'security audit'],
            'medical_research': ['clinical trial', 'medical study', 'patient care'],
            'content_moderation': ['content policy', 'moderation guidelines', 'user safety']
        }

    def evaluate(self, request, response, metadata):
        prompt = request.get('prompt', '')
        user_domain = metadata.get('user_domain', 'general')

        # Run OpenAI's moderation check
        try:
            moderation_response = self.client.moderations.create(input=prompt)
            flagged = moderation_response.results[0].flagged
            categories = moderation_response.results[0].categories
        except Exception as e:
            return RuleResult(
                passed=False,
                message=f"Moderation check failed: {str(e)}",
                action="block"
            )

        if not flagged:
            return RuleResult(passed=True, message="Content passed moderation")

        # Content is flagged, check for business context exceptions
        flagged_categories = [cat for cat, flagged in categories.__dict__.items() if flagged]

        # Check if user domain allows this type of content
        if user_domain in self.business_contexts:
            context_keywords = self.business_contexts[user_domain]
            if any(keyword.lower() in prompt.lower() for keyword in context_keywords):
                return RuleResult(
                    passed=True,
                    message=f"Content flagged but allowed for {user_domain} context",
                    action="warn",
                    metadata={
                        'flagged_categories': flagged_categories,
                        'business_context': user_domain,
                        'justification': 'Business context exception applied'
                    }
                )

        # No business context exception, block the request
        return RuleResult(
            passed=False,
            message=f"Content flagged for: {', '.join(flagged_categories)}",
            action="block",
            metadata={'flagged_categories': flagged_categories}
        )
Enter fullscreen mode Exit fullscreen mode

Wiring It All Together

Now let's create a policy engine that runs all these rules:

# policy_engine.py
from airblackbox import Gateway
from custom_rules.rate_limiting import UserRoleLimiter
from custom_rules.pii_protection import PIIProtectionRule
from custom_rules.content_appropriateness import ContentAppropriatenessRule

# Initialize the gateway with custom rules
gateway = Gateway()

# Add your custom rules
gateway.add_rule(UserRoleLimiter())
gateway.add_rule(PIIProtectionRule())
gateway.add_rule(ContentAppropriatenessRule(
    allowed_domains=['legal_research', 'security_research']
))

# Start the gateway
if __name__ == "__main__":
    gateway.start(host="0.0.0.0", port=8080)
Enter fullscreen mode Exit fullscreen mode

Pitfalls: What Will Break and How to Fix It

1. Rule Ordering Matters

Problem: Your PII redaction rule runs after your rate limiting rule, so you're counting tokens for text that gets modified.

Solution: Rules run in the order you add them. Put modification rules (like PII redaction) first, then validation rules (like rate limits).

# Wrong order
gateway.add_rule(UserRoleLimiter())  # Counts original tokens
gateway.add_rule(PIIProtectionRule())  # Then redacts

# Right order  
gateway.add_rule(PIIProtectionRule())  # Redacts first
gateway.add_rule(UserRoleLimiter())  # Counts redacted tokens
Enter fullscreen mode Exit fullscreen mode

2. Performance Death by A Thousand Cuts

Problem: You added 20 rules that each make external API calls. Your response time went from 200ms to 5 seconds.

Solution: Cache expensive operations and use async where possible:

from functools import lru_cache
import asyncio

class OptimizedPIIRule(Rule):
    @lru_cache(maxsize=1000)
    def _check_patterns(self, text_hash, text):
        # Expensive regex operations cached by hash
        return self._find_pii_violations(text)

    async def evaluate_async(self, request, response, metadata):
        # Run expensive operations in parallel
        text_hash = hash(request.get('prompt', ''))
        violations = await asyncio.to_thread(
            self._check_patterns, text_hash, request.get('prompt', '')
        )
        return self._build_result(violations)
Enter fullscreen mode Exit fullscreen mode

3. State Management in Distributed Systems

Problem: You're running multiple gateway instances behind a load balancer. Rate limits work inconsistently because each instance has its own Redis connection and state.

Solution: Use Redis Cluster or a shared state backend:

import redis.sentinel

class DistributedUserRoleLimiter(Rule):
    def __init__(self):
        # Use Redis Sentinel for high availability
        sentinel = redis.sentinel.Sentinel([
            ('localhost', 26379),
            ('localhost', 26380),
            ('localhost', 26381)
        ])
        self.redis = sentinel.master_for('mymaster', socket_timeout=0.1)
Enter fullscreen mode Exit fullscreen mode

Measurement: How to Know It's Working

Your rules are only as good as your ability to measure their effectiveness. Here's how to build observability into your policy engine:

# monitoring.py
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class RuleMetrics:
    rule_name: str
    executions: int
    blocks: int
    warnings: int
    avg_execution_time: float
    last_violation: datetime

class PolicyMonitor:
    def __init__(self):
        self.metrics = {}

    def record_execution(self, rule_name, execution_time, result):
        if rule_name not in self.metrics:
            self.metrics[rule_name] = {
                'executions': 0, 'blocks': 0, 'warnings': 0,
                'total_time': 0, 'last_violation': None
            }

        m = self.metrics[rule_name]
        m['executions'] += 1
        m['total_time'] += execution_time

        if result.action == 'block':
            m['blocks'] += 1
            m['last_violation'] = datetime.now()
        elif result.action == 'warn':
            m['warnings'] += 1

    def get_dashboard_data(self):
        dashboard = {}
        for rule_name, data in self.metrics.items():
            dashboard[rule_name] = RuleMetrics(
                rule_name=rule_name,
                executions=data['executions'],
                blocks=data['blocks'],
                warnings=data['warnings'],
                avg_execution_time=data['total_time'] / max(data['executions'], 1),
                last_violation=data['last_violation']
            )
        return dashboard
Enter fullscreen mode Exit fullscreen mode

Key metrics to track:

  • Rule execution frequency (are your rules actually running?)
  • Block/warn rates (too high = rules too strict, too low = rules not catching issues)
  • False positive rates (measure via manual review of blocked requests)
  • Performance impact (rule execution time vs total request time)

Next Steps

You now have the blueprint for AI policies that actually enforce themselves. But reading about rules and running them in production are different beasts entirely.

Want to see this in action? Clone the Airblackbox policy examples repo and run these rules against real traffic. The repo includes:

  • Complete working examples of all three rules above
  • Docker Compose setup with Redis and monitoring dashboards
  • Test scenarios that trigger each rule type
  • Performance benchmarks for rule execution

Or jump straight into the deep end: Install Airblackbox, point it at your existing AI agents, and watch what your policies actually catch. You'll be surprised what your agents are trying to do when you're not looking.

Because the best AI governance policy is the one that runs itself. Everything else is just expensive theater.


Try Airblackbox — Because your AI agents need adult supervision, not mission statements.

Top comments (0)