DEV Community

Cover image for Securing Agentic AI: authorization patterns for autonomous systems
Siddhant Khare
Siddhant Khare

Posted on

Securing Agentic AI: authorization patterns for autonomous systems

Why traditional access control fails for AI agents, and how relationship-based authorization provides a path forward.


The first time I watched an AI agent autonomously chain together twelve API calls to complete a task, I felt two things simultaneously: excitement at the capability, and dread at the security implications.

The agent had been asked to "prepare the weekly team update." It read from Linear, queried our metrics dashboard, pulled context from Slack, drafted content, and posted to Notion. Twelve tools. Thirty seconds. Zero authorization checks beyond the initial OAuth tokens we'd configured at setup.

Every one of those tokens had broad permissions. The agent could have read any Linear ticket, any Slack channel, modified any Notion page. We'd built an incredibly capable system with the security model of a shared password on a sticky note.

This is the state of authorization in agentic AI today. And it's a problem we need to solve before these systems become critical infrastructure.


The Authorization model mismatch

Traditional authorization was designed for a straightforward interaction pattern:

Human → Action → Resource
Enter fullscreen mode Exit fullscreen mode

A user clicks "delete," the system checks if they have the delete permission on that resource, and the action proceeds or fails. The human is present, the action is discrete, and the permission check is synchronous.

Agentic AI breaks every assumption in this model:

Human → Agent → [Plan] → Tool₁ → Tool₂ → ... → Toolₙ → Resources
Enter fullscreen mode Exit fullscreen mode

The human initiates a goal, not an action. The agent autonomously decides which actions to take, in what sequence, on which resources. The human may not even know what tools will be invoked until after execution completes.

This creates three fundamental problems that traditional Role-Based Access Control (RBAC) cannot address.

Problem 1: Delegation without boundaries

When you grant an agent access to your email, what are you actually authorizing?

In most current implementations, you're handing over an OAuth token with scopes like gmail.readonly or gmail.compose. The agent now has the same access you do—to all your emails, regardless of what task you asked it to perform.

Ask the agent to "summarize emails from last week" and it could technically read emails from three years ago, from your confidential HR folder. Nothing in the authorization model prevents this. We're relying on the model's alignment to stay within bounds—a strategy that works until it doesn't.

Problem 2: Action sequences create emergent risk

Consider two individually innocuous permissions:

  • Read calendar events
  • Send Slack messages

An agent with both permissions could:

  1. Read your calendar
  2. Find a meeting titled "Confidential: Q4 Acquisition Discussion"
  3. Post that meeting's details to a public Slack channel

Each permission check passes. The combination creates a data exfiltration pathway that neither permission individually reveals. RBAC has no mechanism to reason about action sequences or their emergent risks.

Problem 3: Temporal and contextual blindness

Permissions in RBAC are static. You have a role, that role has permissions, those permissions exist until revoked.

But agent authorization should be:

  • Task-scoped: Valid only for the current task, not indefinitely
  • Time-bound: Expire after the task completes or a timeout
  • Context-aware: Different permissions based on what the user asked
  • Instantly revocable: User says "stop" and all access terminates

A role like agent-assistant with permissions [read:documents, write:documents, read:calendar] captures none of this nuance. The agent has those permissions whether it's summarizing a document or doing something the user never requested.


Why Relationship-Based access control fits

Relationship-Based Access Control (ReBAC) models authorization as a graph of relationships between entities. Instead of asking "does this role have this permission?", ReBAC asks "does a path exist between this actor and this resource through authorized relationships?"

This graph-based model maps naturally to agentic authorization:

user:alice
  └── delegated_to → agent:session-123
                        └── for_task → task:weekly-update
                                          ├── can_read → linear:project-eng
                                          ├── can_read → slack:channel-team
                                          └── can_write → notion:page-updates
Enter fullscreen mode Exit fullscreen mode

The relationships encode:

  • Who delegated authority (alice)
  • To whom (agent session 123)
  • For what purpose (the weekly-update task)
  • With what scope (specific resources, specific operations)

Revocation is simple: delete the delegated_to relationship, and all downstream access disappears. The graph structure makes this transitive by design.


Building the authorization model

Let's build a concrete authorization model for agentic systems using OpenFGA, an open-source ReBAC implementation. I'll walk through the model design, implementation patterns, and integration architecture.

The type system

First, we define the entities and relationships in our authorization model:

# OpenFGA Authorization Model (DSL format)
model
  schema 1.1

type user

type agent
  relations
    define owner: [user]
    define active_session: [user]

type task
  relations
    define delegator: [user]
    define assignee: [agent]
    define can_access: [resource]

type resource
  relations
    define owner: [user]
    define reader: [user, agent, task]
    define writer: [user, agent, task]

type tool
  relations
    define can_invoke: [user, agent, task]
Enter fullscreen mode Exit fullscreen mode

This model establishes:

  • Users own agents and resources
  • Agents have owners and track active sessions
  • Tasks are the unit of delegation—they connect users, agents, and accessible resources
  • Resources can grant read/write access to users, agents, or tasks
  • Tools can be invoked by users, agents, or scoped to specific tasks

The delegation pattern

When a user initiates an agent task, we create a task object that acts as a permission boundary:

from openfga_sdk import OpenFgaClient, ClientConfiguration
from openfga_sdk.models import ClientTuple
import uuid
from datetime import datetime, timedelta

class AgentAuthorizationService:
    def __init__(self, openfga_client: OpenFgaClient, store_id: str):
        self.client = openfga_client
        self.store_id = store_id

    async def create_task_delegation(
        self,
        user_id: str,
        agent_id: str,
        task_description: str,
        allowed_resources: list[dict],
        ttl_minutes: int = 30
    ) -> str:
        """
        Create a task-scoped delegation from user to agent.

        This establishes a permission boundary: the agent can only
        access resources explicitly linked to this task.
        """
        task_id = f"task:{uuid.uuid4()}"
        expires_at = datetime.utcnow() + timedelta(minutes=ttl_minutes)

        tuples = [
            # User delegates to this task
            ClientTuple(
                user=f"user:{user_id}",
                relation="delegator",
                object=task_id
            ),
            # Agent is assigned to this task
            ClientTuple(
                user=f"agent:{agent_id}",
                relation="assignee",
                object=task_id
            ),
        ]

        # Scope specific resources to this task
        for resource in allowed_resources:
            resource_id = resource["id"]
            access_level = resource.get("access", "reader")

            tuples.append(
                ClientTuple(
                    user=task_id,
                    relation=access_level,
                    object=f"resource:{resource_id}"
                )
            )

        await self.client.write(
            body={"writes": {"tuple_keys": tuples}},
            options={"store_id": self.store_id}
        )

        # Store task metadata (for expiration handling)
        await self._store_task_metadata(task_id, {
            "user_id": user_id,
            "agent_id": agent_id,
            "description": task_description,
            "expires_at": expires_at.isoformat(),
            "created_at": datetime.utcnow().isoformat()
        })

        return task_id
Enter fullscreen mode Exit fullscreen mode

The critical insight here: the agent doesn't receive broad permissions. It receives assignment to a task, and that task has specific resource access. The agent's authority is bounded by the task's scope.

Checking authorization

Before any agent action, we verify authorization through the task relationship:

async def check_agent_resource_access(
    self,
    agent_id: str,
    task_id: str,
    resource_id: str,
    access_type: str = "reader"
) -> tuple[bool, str]:
    """
    Check if an agent can access a resource within a task context.

    Returns (authorized: bool, reason: str)
    """
    # First: Is this agent assigned to this task?
    agent_assigned = await self.client.check(
        body={
            "tuple_key": {
                "user": f"agent:{agent_id}",
                "relation": "assignee",
                "object": task_id
            }
        },
        options={"store_id": self.store_id}
    )

    if not agent_assigned.allowed:
        return False, "Agent not assigned to this task"

    # Second: Does this task have access to this resource?
    task_has_access = await self.client.check(
        body={
            "tuple_key": {
                "user": task_id,
                "relation": access_type,
                "object": f"resource:{resource_id}"
            }
        },
        options={"store_id": self.store_id}
    )

    if not task_has_access.allowed:
        return False, f"Task does not have {access_type} access to resource"

    return True, "Authorized"
Enter fullscreen mode Exit fullscreen mode

This two-step check ensures:

  1. The agent is legitimately working on this task
  2. The task has been granted access to this resource

An agent can't access resources outside its assigned task, even if it has accessed those resources in previous tasks.

Revocation That Actually Works

When a user cancels a task or the task expires, all derived permissions must terminate:

async def revoke_task(self, task_id: str) -> dict:
    """
    Revoke a task and all its associated permissions.

    This is where ReBAC shines: deleting the task relationships
    cascades to remove all resource access.
    """
    # Read all tuples where this task is involved
    related_tuples = await self.client.read(
        body={"tuple_key": {"object": task_id}},
        options={"store_id": self.store_id}
    )

    # Also get tuples where task is the user (accessing resources)
    task_access_tuples = await self.client.read(
        body={"tuple_key": {"user": task_id}},
        options={"store_id": self.store_id}
    )

    all_tuples = (
        related_tuples.tuples + 
        task_access_tuples.tuples
    )

    if all_tuples:
        await self.client.write(
            body={
                "deletes": {
                    "tuple_keys": [
                        {
                            "user": t.key.user,
                            "relation": t.key.relation,
                            "object": t.key.object
                        }
                        for t in all_tuples
                    ]
                }
            },
            options={"store_id": self.store_id}
        )

    await self._delete_task_metadata(task_id)

    return {
        "task_id": task_id,
        "tuples_revoked": len(all_tuples),
        "status": "revoked"
    }
Enter fullscreen mode Exit fullscreen mode

Compare this to RBAC revocation, where you'd need to track every permission granted, remember which ones were for this task versus others, and selectively revoke. The relationship graph makes the task a natural permission boundary that can be deleted atomically.


The authorization gateway

Individual authorization checks aren't enough. We need a gateway that sits between the agent and all external tools/resources, enforcing authorization on every action:

from typing import Callable, Any
from functools import wraps
import asyncio

class AuthorizationGateway:
    """
    Gateway that wraps all agent tool calls with authorization checks.
    """

    def __init__(self, auth_service: AgentAuthorizationService):
        self.auth_service = auth_service
        self.audit_log = []

    def authorized_tool(
        self,
        resource_extractor: Callable[[dict], str],
        access_type: str = "reader"
    ):
        """
        Decorator that wraps a tool function with authorization.

        Args:
            resource_extractor: Function to extract resource ID from tool args
            access_type: Required access level (reader, writer)
        """
        def decorator(tool_func: Callable):
            @wraps(tool_func)
            async def wrapper(
                agent_id: str,
                task_id: str,
                **kwargs
            ) -> Any:
                resource_id = resource_extractor(kwargs)

                # Check authorization
                authorized, reason = await self.auth_service.check_agent_resource_access(
                    agent_id=agent_id,
                    task_id=task_id,
                    resource_id=resource_id,
                    access_type=access_type
                )

                # Audit logging (always, regardless of outcome)
                audit_entry = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "agent_id": agent_id,
                    "task_id": task_id,
                    "tool": tool_func.__name__,
                    "resource_id": resource_id,
                    "access_type": access_type,
                    "authorized": authorized,
                    "reason": reason
                }
                self.audit_log.append(audit_entry)

                if not authorized:
                    raise AuthorizationError(
                        f"Unauthorized: {reason}",
                        audit_entry=audit_entry
                    )

                # Execute the actual tool
                return await tool_func(**kwargs)

            return wrapper
        return decorator


class AuthorizationError(Exception):
    def __init__(self, message: str, audit_entry: dict):
        super().__init__(message)
        self.audit_entry = audit_entry
Enter fullscreen mode Exit fullscreen mode

Now we can wrap our tools:

gateway = AuthorizationGateway(auth_service)

@gateway.authorized_tool(
    resource_extractor=lambda args: args["document_id"],
    access_type="reader"
)
async def read_document(document_id: str) -> str:
    """Read a document from the document store."""
    return await document_store.get(document_id)


@gateway.authorized_tool(
    resource_extractor=lambda args: args["document_id"],
    access_type="writer"
)
async def update_document(document_id: str, content: str) -> bool:
    """Update a document in the document store."""
    return await document_store.update(document_id, content)


@gateway.authorized_tool(
    resource_extractor=lambda args: f"slack:{args['channel_id']}",
    access_type="writer"
)
async def post_to_slack(channel_id: str, message: str) -> bool:
    """Post a message to a Slack channel."""
    return await slack_client.post_message(channel_id, message)
Enter fullscreen mode Exit fullscreen mode

Every tool invocation now passes through authorization. The agent can't bypass it—the tools simply don't execute without valid authorization context.


Handling scope inference

One challenge remains: how do we determine which resources a task should have access to? The user says "summarize my emails from last week"—we need to translate that into specific permission grants.

This requires a scope inference layer that runs before task creation:

from anthropic import Anthropic

class ScopeInferenceService:
    """
    Infer required resource scopes from natural language task descriptions.
    """

    SCOPE_INFERENCE_PROMPT = """Analyze the user's request and determine the minimal required resource access.

User request: {request}

Available resource types:
- email: Gmail messages (scopes: read, send, delete)
- calendar: Google Calendar (scopes: read, write)
- documents: Google Docs (scopes: read, write)
- slack: Slack channels (scopes: read, write)
- linear: Linear issues (scopes: read, write)

Output a JSON object with:
{{
  "resources": [
    {{
      "type": "resource_type",
      "id": "specific_id or pattern",
      "access": "read or write",
      "constraints": {{
        "time_range": "if applicable",
        "filters": ["any filters"]
      }}
    }}
  ],
  "reasoning": "brief explanation of why these resources are needed"
}}

Be minimal: only include resources strictly necessary for the task.
Prefer read access over write access unless modification is explicitly requested."""

    def __init__(self, anthropic_client: Anthropic):
        self.client = anthropic_client

    async def infer_scopes(
        self,
        user_request: str,
        available_resources: list[dict]
    ) -> dict:
        """
        Infer minimal required scopes from a natural language request.
        """
        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": self.SCOPE_INFERENCE_PROMPT.format(
                    request=user_request
                )
            }]
        )

        # Parse the response
        import json
        scope_text = response.content[0].text

        # Extract JSON from response
        start = scope_text.find('{')
        end = scope_text.rfind('}') + 1
        scope_data = json.loads(scope_text[start:end])

        # Validate against available resources
        validated_resources = []
        for resource in scope_data["resources"]:
            if self._resource_available(resource, available_resources):
                validated_resources.append(resource)

        return {
            "resources": validated_resources,
            "reasoning": scope_data.get("reasoning", ""),
            "original_request": user_request
        }

    def _resource_available(
        self,
        requested: dict,
        available: list[dict]
    ) -> bool:
        """Check if a requested resource is in the available set."""
        for avail in available:
            if (avail["type"] == requested["type"] and 
                self._id_matches(requested["id"], avail["id"])):
                return True
        return False

    def _id_matches(self, requested_id: str, available_id: str) -> bool:
        """Check if a requested resource ID matches an available one."""
        if requested_id == available_id:
            return True
        if "*" in available_id:
            pattern = available_id.replace("*", ".*")
            import re
            return bool(re.match(pattern, requested_id))
        return False
Enter fullscreen mode Exit fullscreen mode

The complete flow becomes:

async def initiate_agent_task(
    user_id: str,
    agent_id: str,
    user_request: str
) -> dict:
    """
    Complete flow: user request → scope inference → task creation → agent execution
    """
    # 1. Get user's available resources
    user_resources = await get_user_resources(user_id)

    # 2. Infer minimal required scopes
    scope_service = ScopeInferenceService(anthropic_client)
    inferred_scopes = await scope_service.infer_scopes(
        user_request=user_request,
        available_resources=user_resources
    )

    # 3. Create task with scoped permissions
    auth_service = AgentAuthorizationService(openfga_client, store_id)
    task_id = await auth_service.create_task_delegation(
        user_id=user_id,
        agent_id=agent_id,
        task_description=user_request,
        allowed_resources=inferred_scopes["resources"],
        ttl_minutes=30
    )

    # 4. Return task context for agent execution
    return {
        "task_id": task_id,
        "scopes": inferred_scopes,
        "agent_id": agent_id,
        "status": "ready"
    }
Enter fullscreen mode Exit fullscreen mode

Production considerations

TTL and automatic expiration

Tasks should expire automatically. Implement a background job that cleans up expired tasks:

async def cleanup_expired_tasks():
    """
    Background job to revoke expired tasks.
    Run every minute via scheduler.
    """
    expired_tasks = await get_expired_task_ids()

    for task_id in expired_tasks:
        try:
            await auth_service.revoke_task(task_id)
            logger.info(f"Revoked expired task: {task_id}")
        except Exception as e:
            logger.error(f"Failed to revoke task {task_id}: {e}")
Enter fullscreen mode Exit fullscreen mode

Audit trail for compliance

Every authorization decision should be logged for audit:

@dataclass
class AuditEvent:
    timestamp: datetime
    event_type: str  # "delegation_created", "access_checked", "access_denied", "task_revoked"
    user_id: str
    agent_id: str
    task_id: str
    resource_id: Optional[str]
    decision: str  # "allowed", "denied"
    reason: str
    metadata: dict

async def log_audit_event(event: AuditEvent):
    """Log to your audit system of choice."""
    await audit_store.write(event)
Enter fullscreen mode Exit fullscreen mode

Performance: Caching Authorization Decisions

Authorization checks happen on every tool call. For performance, implement caching with appropriate invalidation:

from functools import lru_cache
import hashlib

class CachedAuthorizationService(AgentAuthorizationService):
    def __init__(self, *args, cache_ttl_seconds: int = 60, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = {}
        self.cache_ttl = cache_ttl_seconds

    async def check_agent_resource_access(
        self,
        agent_id: str,
        task_id: str,
        resource_id: str,
        access_type: str = "reader"
    ) -> tuple[bool, str]:
        cache_key = f"{agent_id}:{task_id}:{resource_id}:{access_type}"

        # Check cache
        if cache_key in self.cache:
            entry = self.cache[cache_key]
            if datetime.utcnow() < entry["expires_at"]:
                return entry["result"]

        # Cache miss - perform actual check
        result = await super().check_agent_resource_access(
            agent_id, task_id, resource_id, access_type
        )

        # Cache the result (shorter TTL for denials)
        ttl = self.cache_ttl if result[0] else 10
        self.cache[cache_key] = {
            "result": result,
            "expires_at": datetime.utcnow() + timedelta(seconds=ttl)
        }

        return result

    def invalidate_task_cache(self, task_id: str):
        """Invalidate all cache entries for a task."""
        keys_to_delete = [
            k for k in self.cache.keys() 
            if task_id in k
        ]
        for key in keys_to_delete:
            del self.cache[key]
Enter fullscreen mode Exit fullscreen mode

The Bigger Picture

This authorization model is a foundation, not a complete solution. Real-world deployments will need:

Chain-of-thought authorization: Validating not just individual tool calls, but sequences of calls that might create emergent risks. This requires pattern detection on action sequences—a topic for a future post.

User confirmation flows: For high-risk operations, the authorization system should pause execution and request explicit user confirmation rather than auto-allowing based on inferred scopes.

Cross-agent authorization: When agents delegate to other agents (increasingly common in multi-agent systems), the delegation chain needs to preserve authorization context and enforce attenuation—each hop can only reduce permissions, never expand them.

Federated authorization: As AI agents operate across organizational boundaries, we'll need standards for expressing and verifying delegated authority across trust domains.


Closing thoughts

We're at an inflection point with agentic AI. The capabilities are advancing faster than the security models to contain them. Every week brings new frameworks for building agents, new tools for them to invoke, new integrations to connect—and almost none of it comes with authorization baked in.

The patterns in this post aren't theoretical. They're the minimum viable security for any production agent system. The task-scoped delegation model, the authorization gateway, the audit trail—these should be table stakes, not advanced features.

The good news: the building blocks exist. OpenFGA and similar ReBAC systems provide the authorization primitives. The patterns are portable across implementations. What's missing is adoption.

If you're building agentic systems, I'd encourage you to implement authorization from day one. Retrofitting it later is painful, and the security risks of uncontrolled agents are too significant to defer.

The age of autonomous AI systems is here. Let's make sure they operate within appropriate boundaries.


Found this useful? I write about AI infrastructure, security, and the engineering challenges of building production AI systems. Connect with me on LinkedIn or Twitter/X.

The code examples in this post are available on GitHub.

Top comments (0)