Why traditional access control fails for AI agents, and how relationship-based authorization provides a path forward.
The first time I watched an AI agent autonomously chain together twelve API calls to complete a task, I felt two things simultaneously: excitement at the capability, and dread at the security implications.
The agent had been asked to "prepare the weekly team update." It read from Linear, queried our metrics dashboard, pulled context from Slack, drafted content, and posted to Notion. Twelve tools. Thirty seconds. Zero authorization checks beyond the initial OAuth tokens we'd configured at setup.
Every one of those tokens had broad permissions. The agent could have read any Linear ticket, any Slack channel, modified any Notion page. We'd built an incredibly capable system with the security model of a shared password on a sticky note.
This is the state of authorization in agentic AI today. And it's a problem we need to solve before these systems become critical infrastructure.
The Authorization model mismatch
Traditional authorization was designed for a straightforward interaction pattern:
Human → Action → Resource
A user clicks "delete," the system checks if they have the delete permission on that resource, and the action proceeds or fails. The human is present, the action is discrete, and the permission check is synchronous.
Agentic AI breaks every assumption in this model:
Human → Agent → [Plan] → Tool₁ → Tool₂ → ... → Toolₙ → Resources
The human initiates a goal, not an action. The agent autonomously decides which actions to take, in what sequence, on which resources. The human may not even know what tools will be invoked until after execution completes.
This creates three fundamental problems that traditional Role-Based Access Control (RBAC) cannot address.
Problem 1: Delegation without boundaries
When you grant an agent access to your email, what are you actually authorizing?
In most current implementations, you're handing over an OAuth token with scopes like gmail.readonly or gmail.compose. The agent now has the same access you do—to all your emails, regardless of what task you asked it to perform.
Ask the agent to "summarize emails from last week" and it could technically read emails from three years ago, from your confidential HR folder. Nothing in the authorization model prevents this. We're relying on the model's alignment to stay within bounds—a strategy that works until it doesn't.
Problem 2: Action sequences create emergent risk
Consider two individually innocuous permissions:
- Read calendar events
- Send Slack messages
An agent with both permissions could:
- Read your calendar
- Find a meeting titled "Confidential: Q4 Acquisition Discussion"
- Post that meeting's details to a public Slack channel
Each permission check passes. The combination creates a data exfiltration pathway that neither permission individually reveals. RBAC has no mechanism to reason about action sequences or their emergent risks.
Problem 3: Temporal and contextual blindness
Permissions in RBAC are static. You have a role, that role has permissions, those permissions exist until revoked.
But agent authorization should be:
- Task-scoped: Valid only for the current task, not indefinitely
- Time-bound: Expire after the task completes or a timeout
- Context-aware: Different permissions based on what the user asked
- Instantly revocable: User says "stop" and all access terminates
A role like agent-assistant with permissions [read:documents, write:documents, read:calendar] captures none of this nuance. The agent has those permissions whether it's summarizing a document or doing something the user never requested.
Why Relationship-Based access control fits
Relationship-Based Access Control (ReBAC) models authorization as a graph of relationships between entities. Instead of asking "does this role have this permission?", ReBAC asks "does a path exist between this actor and this resource through authorized relationships?"
This graph-based model maps naturally to agentic authorization:
user:alice
└── delegated_to → agent:session-123
└── for_task → task:weekly-update
├── can_read → linear:project-eng
├── can_read → slack:channel-team
└── can_write → notion:page-updates
The relationships encode:
- Who delegated authority (alice)
- To whom (agent session 123)
- For what purpose (the weekly-update task)
- With what scope (specific resources, specific operations)
Revocation is simple: delete the delegated_to relationship, and all downstream access disappears. The graph structure makes this transitive by design.
Building the authorization model
Let's build a concrete authorization model for agentic systems using OpenFGA, an open-source ReBAC implementation. I'll walk through the model design, implementation patterns, and integration architecture.
The type system
First, we define the entities and relationships in our authorization model:
# OpenFGA Authorization Model (DSL format)
model
schema 1.1
type user
type agent
relations
define owner: [user]
define active_session: [user]
type task
relations
define delegator: [user]
define assignee: [agent]
define can_access: [resource]
type resource
relations
define owner: [user]
define reader: [user, agent, task]
define writer: [user, agent, task]
type tool
relations
define can_invoke: [user, agent, task]
This model establishes:
- Users own agents and resources
- Agents have owners and track active sessions
- Tasks are the unit of delegation—they connect users, agents, and accessible resources
- Resources can grant read/write access to users, agents, or tasks
- Tools can be invoked by users, agents, or scoped to specific tasks
The delegation pattern
When a user initiates an agent task, we create a task object that acts as a permission boundary:
from openfga_sdk import OpenFgaClient, ClientConfiguration
from openfga_sdk.models import ClientTuple
import uuid
from datetime import datetime, timedelta
class AgentAuthorizationService:
def __init__(self, openfga_client: OpenFgaClient, store_id: str):
self.client = openfga_client
self.store_id = store_id
async def create_task_delegation(
self,
user_id: str,
agent_id: str,
task_description: str,
allowed_resources: list[dict],
ttl_minutes: int = 30
) -> str:
"""
Create a task-scoped delegation from user to agent.
This establishes a permission boundary: the agent can only
access resources explicitly linked to this task.
"""
task_id = f"task:{uuid.uuid4()}"
expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes)
tuples = [
# User delegates to this task
ClientTuple(
user=f"user:{user_id}",
relation="delegator",
object=task_id
),
# Agent is assigned to this task
ClientTuple(
user=f"agent:{agent_id}",
relation="assignee",
object=task_id
),
]
# Scope specific resources to this task
for resource in allowed_resources:
resource_id = resource["id"]
access_level = resource.get("access", "reader")
tuples.append(
ClientTuple(
user=task_id,
relation=access_level,
object=f"resource:{resource_id}"
)
)
await self.client.write(
body={"writes": {"tuple_keys": tuples}},
options={"store_id": self.store_id}
)
# Store task metadata (for expiration handling)
await self._store_task_metadata(task_id, {
"user_id": user_id,
"agent_id": agent_id,
"description": task_description,
"expires_at": expires_at.isoformat(),
"created_at": datetime.utcnow().isoformat()
})
return task_id
The critical insight here: the agent doesn't receive broad permissions. It receives assignment to a task, and that task has specific resource access. The agent's authority is bounded by the task's scope.
Checking authorization
Before any agent action, we verify authorization through the task relationship:
async def check_agent_resource_access(
self,
agent_id: str,
task_id: str,
resource_id: str,
access_type: str = "reader"
) -> tuple[bool, str]:
"""
Check if an agent can access a resource within a task context.
Returns (authorized: bool, reason: str)
"""
# First: Is this agent assigned to this task?
agent_assigned = await self.client.check(
body={
"tuple_key": {
"user": f"agent:{agent_id}",
"relation": "assignee",
"object": task_id
}
},
options={"store_id": self.store_id}
)
if not agent_assigned.allowed:
return False, "Agent not assigned to this task"
# Second: Does this task have access to this resource?
task_has_access = await self.client.check(
body={
"tuple_key": {
"user": task_id,
"relation": access_type,
"object": f"resource:{resource_id}"
}
},
options={"store_id": self.store_id}
)
if not task_has_access.allowed:
return False, f"Task does not have {access_type} access to resource"
return True, "Authorized"
This two-step check ensures:
- The agent is legitimately working on this task
- The task has been granted access to this resource
An agent can't access resources outside its assigned task, even if it has accessed those resources in previous tasks.
Revocation That Actually Works
When a user cancels a task or the task expires, all derived permissions must terminate:
async def revoke_task(self, task_id: str) -> dict:
"""
Revoke a task and all its associated permissions.
This is where ReBAC shines: deleting the task relationships
cascades to remove all resource access.
"""
# Read all tuples where this task is involved
related_tuples = await self.client.read(
body={"tuple_key": {"object": task_id}},
options={"store_id": self.store_id}
)
# Also get tuples where task is the user (accessing resources)
task_access_tuples = await self.client.read(
body={"tuple_key": {"user": task_id}},
options={"store_id": self.store_id}
)
all_tuples = (
related_tuples.tuples +
task_access_tuples.tuples
)
if all_tuples:
await self.client.write(
body={
"deletes": {
"tuple_keys": [
{
"user": t.key.user,
"relation": t.key.relation,
"object": t.key.object
}
for t in all_tuples
]
}
},
options={"store_id": self.store_id}
)
await self._delete_task_metadata(task_id)
return {
"task_id": task_id,
"tuples_revoked": len(all_tuples),
"status": "revoked"
}
Compare this to RBAC revocation, where you'd need to track every permission granted, remember which ones were for this task versus others, and selectively revoke. The relationship graph makes the task a natural permission boundary that can be deleted atomically.
The authorization gateway
Individual authorization checks aren't enough. We need a gateway that sits between the agent and all external tools/resources, enforcing authorization on every action:
from typing import Callable, Any
from functools import wraps
import asyncio
class AuthorizationGateway:
"""
Gateway that wraps all agent tool calls with authorization checks.
"""
def __init__(self, auth_service: AgentAuthorizationService):
self.auth_service = auth_service
self.audit_log = []
def authorized_tool(
self,
resource_extractor: Callable[[dict], str],
access_type: str = "reader"
):
"""
Decorator that wraps a tool function with authorization.
Args:
resource_extractor: Function to extract resource ID from tool args
access_type: Required access level (reader, writer)
"""
def decorator(tool_func: Callable):
@wraps(tool_func)
async def wrapper(
agent_id: str,
task_id: str,
**kwargs
) -> Any:
resource_id = resource_extractor(kwargs)
# Check authorization
authorized, reason = await self.auth_service.check_agent_resource_access(
agent_id=agent_id,
task_id=task_id,
resource_id=resource_id,
access_type=access_type
)
# Audit logging (always, regardless of outcome)
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"agent_id": agent_id,
"task_id": task_id,
"tool": tool_func.__name__,
"resource_id": resource_id,
"access_type": access_type,
"authorized": authorized,
"reason": reason
}
self.audit_log.append(audit_entry)
if not authorized:
raise AuthorizationError(
f"Unauthorized: {reason}",
audit_entry=audit_entry
)
# Execute the actual tool
return await tool_func(**kwargs)
return wrapper
return decorator
class AuthorizationError(Exception):
def __init__(self, message: str, audit_entry: dict):
super().__init__(message)
self.audit_entry = audit_entry
Now we can wrap our tools:
gateway = AuthorizationGateway(auth_service)
@gateway.authorized_tool(
resource_extractor=lambda args: args["document_id"],
access_type="reader"
)
async def read_document(document_id: str) -> str:
"""Read a document from the document store."""
return await document_store.get(document_id)
@gateway.authorized_tool(
resource_extractor=lambda args: args["document_id"],
access_type="writer"
)
async def update_document(document_id: str, content: str) -> bool:
"""Update a document in the document store."""
return await document_store.update(document_id, content)
@gateway.authorized_tool(
resource_extractor=lambda args: f"slack:{args['channel_id']}",
access_type="writer"
)
async def post_to_slack(channel_id: str, message: str) -> bool:
"""Post a message to a Slack channel."""
return await slack_client.post_message(channel_id, message)
Every tool invocation now passes through authorization. The agent can't bypass it—the tools simply don't execute without valid authorization context.
Handling scope inference
One challenge remains: how do we determine which resources a task should have access to? The user says "summarize my emails from last week"—we need to translate that into specific permission grants.
This requires a scope inference layer that runs before task creation:
from anthropic import Anthropic
class ScopeInferenceService:
"""
Infer required resource scopes from natural language task descriptions.
"""
SCOPE_INFERENCE_PROMPT = """Analyze the user's request and determine the minimal required resource access.
User request: {request}
Available resource types:
- email: Gmail messages (scopes: read, send, delete)
- calendar: Google Calendar (scopes: read, write)
- documents: Google Docs (scopes: read, write)
- slack: Slack channels (scopes: read, write)
- linear: Linear issues (scopes: read, write)
Output a JSON object with:
{{
"resources": [
{{
"type": "resource_type",
"id": "specific_id or pattern",
"access": "read or write",
"constraints": {{
"time_range": "if applicable",
"filters": ["any filters"]
}}
}}
],
"reasoning": "brief explanation of why these resources are needed"
}}
Be minimal: only include resources strictly necessary for the task.
Prefer read access over write access unless modification is explicitly requested."""
def __init__(self, anthropic_client: Anthropic):
self.client = anthropic_client
async def infer_scopes(
self,
user_request: str,
available_resources: list[dict]
) -> dict:
"""
Infer minimal required scopes from a natural language request.
"""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{
"role": "user",
"content": self.SCOPE_INFERENCE_PROMPT.format(
request=user_request
)
}]
)
# Parse the response
import json
scope_text = response.content[0].text
# Extract JSON from response
start = scope_text.find('{')
end = scope_text.rfind('}') + 1
scope_data = json.loads(scope_text[start:end])
# Validate against available resources
validated_resources = []
for resource in scope_data["resources"]:
if self._resource_available(resource, available_resources):
validated_resources.append(resource)
return {
"resources": validated_resources,
"reasoning": scope_data.get("reasoning", ""),
"original_request": user_request
}
def _resource_available(
self,
requested: dict,
available: list[dict]
) -> bool:
"""Check if a requested resource is in the available set."""
for avail in available:
if (avail["type"] == requested["type"] and
self._id_matches(requested["id"], avail["id"])):
return True
return False
def _id_matches(self, requested_id: str, available_id: str) -> bool:
"""Check if a requested resource ID matches an available one."""
if requested_id == available_id:
return True
if "*" in available_id:
pattern = available_id.replace("*", ".*")
import re
return bool(re.match(pattern, requested_id))
return False
The complete flow becomes:
async def initiate_agent_task(
user_id: str,
agent_id: str,
user_request: str
) -> dict:
"""
Complete flow: user request → scope inference → task creation → agent execution
"""
# 1. Get user's available resources
user_resources = await get_user_resources(user_id)
# 2. Infer minimal required scopes
scope_service = ScopeInferenceService(anthropic_client)
inferred_scopes = await scope_service.infer_scopes(
user_request=user_request,
available_resources=user_resources
)
# 3. Create task with scoped permissions
auth_service = AgentAuthorizationService(openfga_client, store_id)
task_id = await auth_service.create_task_delegation(
user_id=user_id,
agent_id=agent_id,
task_description=user_request,
allowed_resources=inferred_scopes["resources"],
ttl_minutes=30
)
# 4. Return task context for agent execution
return {
"task_id": task_id,
"scopes": inferred_scopes,
"agent_id": agent_id,
"status": "ready"
}
Production considerations
TTL and automatic expiration
Tasks should expire automatically. Implement a background job that cleans up expired tasks:
async def cleanup_expired_tasks():
"""
Background job to revoke expired tasks.
Run every minute via scheduler.
"""
expired_tasks = await get_expired_task_ids()
for task_id in expired_tasks:
try:
await auth_service.revoke_task(task_id)
logger.info(f"Revoked expired task: {task_id}")
except Exception as e:
logger.error(f"Failed to revoke task {task_id}: {e}")
Audit trail for compliance
Every authorization decision should be logged for audit:
@dataclass
class AuditEvent:
timestamp: datetime
event_type: str # "delegation_created", "access_checked", "access_denied", "task_revoked"
user_id: str
agent_id: str
task_id: str
resource_id: Optional[str]
decision: str # "allowed", "denied"
reason: str
metadata: dict
async def log_audit_event(event: AuditEvent):
"""Log to your audit system of choice."""
await audit_store.write(event)
Performance: Caching Authorization Decisions
Authorization checks happen on every tool call. For performance, implement caching with appropriate invalidation:
from functools import lru_cache
import hashlib
class CachedAuthorizationService(AgentAuthorizationService):
def __init__(self, *args, cache_ttl_seconds: int = 60, **kwargs):
super().__init__(*args, **kwargs)
self.cache = {}
self.cache_ttl = cache_ttl_seconds
async def check_agent_resource_access(
self,
agent_id: str,
task_id: str,
resource_id: str,
access_type: str = "reader"
) -> tuple[bool, str]:
cache_key = f"{agent_id}:{task_id}:{resource_id}:{access_type}"
# Check cache
if cache_key in self.cache:
entry = self.cache[cache_key]
if datetime.utcnow() < entry["expires_at"]:
return entry["result"]
# Cache miss - perform actual check
result = await super().check_agent_resource_access(
agent_id, task_id, resource_id, access_type
)
# Cache the result (shorter TTL for denials)
ttl = self.cache_ttl if result[0] else 10
self.cache[cache_key] = {
"result": result,
"expires_at": datetime.utcnow() + timedelta(seconds=ttl)
}
return result
def invalidate_task_cache(self, task_id: str):
"""Invalidate all cache entries for a task."""
keys_to_delete = [
k for k in self.cache.keys()
if task_id in k
]
for key in keys_to_delete:
del self.cache[key]
The Bigger Picture
This authorization model is a foundation, not a complete solution. Real-world deployments will need:
Chain-of-thought authorization: Validating not just individual tool calls, but sequences of calls that might create emergent risks. This requires pattern detection on action sequences—a topic for a future post.
User confirmation flows: For high-risk operations, the authorization system should pause execution and request explicit user confirmation rather than auto-allowing based on inferred scopes.
Cross-agent authorization: When agents delegate to other agents (increasingly common in multi-agent systems), the delegation chain needs to preserve authorization context and enforce attenuation—each hop can only reduce permissions, never expand them.
Federated authorization: As AI agents operate across organizational boundaries, we'll need standards for expressing and verifying delegated authority across trust domains.
Closing thoughts
We're at an inflection point with agentic AI. The capabilities are advancing faster than the security models to contain them. Every week brings new frameworks for building agents, new tools for them to invoke, new integrations to connect—and almost none of it comes with authorization baked in.
The patterns in this post aren't theoretical. They're the minimum viable security for any production agent system. The task-scoped delegation model, the authorization gateway, the audit trail—these should be table stakes, not advanced features.
The good news: the building blocks exist. OpenFGA and similar ReBAC systems provide the authorization primitives. The patterns are portable across implementations. What's missing is adoption.
If you're building agentic systems, I'd encourage you to implement authorization from day one. Retrofitting it later is painful, and the security risks of uncontrolled agents are too significant to defer.
The age of autonomous AI systems is here. Let's make sure they operate within appropriate boundaries.
Found this useful? I write about AI infrastructure, security, and the engineering challenges of building production AI systems. Connect with me on LinkedIn or Twitter/X.
The code examples in this post are available on GitHub.
Top comments (0)