DEV Community

Samuel Oshin
Samuel Oshin

Posted on

Building PRRover: A FastAPI GitHub PR Reviewer with Telex A2A Integration

I recently built PRRover, an AI-powered GitHub Pull Request reviewer that integrates seamlessly with Telex, an AI agent platform like Zapier. It uses the cutting-edge A2A (Agent-to-Agent) protocol to perform automated code reviews—analyzing security vulnerabilities, performance issues, and adherence to best practices—and delivers rich, actionable reports directly in your chat interface. This project is a perfect showcase of marrying Python's power, FastAPI's speed, and the A2A standard.

You can chat with the live agent and try it yourself on Telex: PRRover: The GitHub Code Review Bot. This agent monitors and analyzes GitHub Pull Requests for security, performance, and best practice adherence, delivering comprehensive, actionable reports to the user.

In this post, I'll walk you through:

  • What the A2A protocol is and how it works
  • Building an A2A-compliant agent with FastAPI
  • Implementing blocking vs non-blocking (webhook) flows
  • Using LLMs for deep code analysis
  • Testing and troubleshooting the integration

Whether you're building chat-integrated agents or exploring webhook-driven architectures, the patterns here will help you ship a production-ready A2A integration.


🎯 What We Built

PRRover is a FastAPI service that:

  • ✅ Accepts A2A JSON-RPC calls from Telex at /a2a/tasks
  • ✅ Supports blocking and non-blocking modes
    • Blocking: Returns completed analysis immediately (for tasks under 10 seconds)
    • Non-blocking: Returns "accepted" instantly, then pushes results via webhook (for complex, 30-40 second analyses)
  • ✅ Uses hybrid analysis pipeline:
    • Fast rule-based security/performance checks
    • Deep LLM-powered analysis (Google Gemini 2.0)
  • ✅ Delivers results as rich text artifacts with CWE references, severity levels, and actionable recommendations

Live Demo: https://code-reviewer-agent-a2a.onrender.com

Repository: https://github.com/SamuelOshin/code_reviewer_agent_a2a


🐍 The Core Integration: Python, FastAPI, and A2A

The entire PRRover service is built on Python, leveraging the high-performance, asynchronous web framework FastAPI. FastAPI's native support for asynchronous operations (async/await) was crucial for implementing the non-blocking A2A flow, ensuring a snappy user experience even for lengthy tasks.

Handling the A2A Protocol in FastAPI

The Agent-to-Agent (A2A) protocol mandates communication via JSON-RPC 2.0 over a single endpoint, /a2a/tasks. My FastAPI implementation handles this integration by:

  1. Request Parsing: The main @router.post("/a2a/tasks") endpoint receives the incoming raw JSON payload. Since A2A is JSON-RPC, the function extracts crucial fields like method (e.g., message/send), id, and params.

  2. Asynchronous Dispatch (The Key to UX): When a lengthy task is detected (by checking the configuration.blocking flag and finding it set to false), FastAPI immediately returns an HTTP 200 response with the accepted status. Crucially, we use Python's asyncio.create_task to spawn a non-blocking background process. This frees up the HTTP worker, preventing timeouts while the actual, compute-heavy work proceeds in parallel.

  3. Webhook Fulfillment: Once the Python analysis pipeline is complete, the background task uses an asynchronous HTTP client to push the final completed task result back to the unique Telex webhook URL provided in the initial A2A request. This pattern is essential for any long-running agent built with FastAPI.


📡 Understanding the A2A Protocol

A2A uses JSON-RPC 2.0 for agent-to-agent communication. Key concepts:

Incoming Request from Telex

When a user sends a message to your agent through Telex:

{
  "jsonrpc": "2.0",
  "id": "6d484504a27146998768f486aa4a8b41",
  "method": "message/send",
  "params": {
    "message": {
      "kind": "message",
      "role": "user",
      "parts": [
        {
          "kind": "text",
          "text": "Review https://github.com/owner/repo/pull/123"
        }
      ],
      "messageId": "c4bdc8b7c96f475ba92919b80ead7729"
    },
    "configuration": {
      "blocking": false,
      "pushNotificationConfig": {
        "url": "https://ping.telex.im/v1/a2a/webhooks/...",
        "token": "eyJhbGc...",
        "authentication": {
          "schemes": ["Bearer"]
        }
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Your Agent's Response

For blocking requests, return a completed task immediately:

{
  "jsonrpc": "2.0",
  "id": "6d484504a27146998768f486aa4a8b41",
  "result": {
    "id": "task-uuid",
    "contextId": "context-uuid",
    "status": {
      "state": "completed",
      "timestamp": "2025-11-03T17:35:23.727191Z",
      "message": {
        "messageId": "msg-uuid",
        "role": "agent",
        "parts": [{
          "kind": "text",
          "text": "✅ Analysis complete! Found 6 security issues..."
        }],
        "kind": "message",
        "taskId": "task-uuid",
        "timestamp": "2025-11-03T17:35:23.727162Z"
      },
      "progress": 1.0
    },
    "artifacts": [
      {
        "artifactId": "artifact-uuid",
        "name": "PR #123 Analysis",
        "parts": [{
          "kind": "text",
          "text": "# Full Analysis Report\n\n..."
        }]
      }
    ],
    "history": [],
    "kind": "task"
  }
}
Enter fullscreen mode Exit fullscreen mode

For non-blocking requests, immediately return "accepted":

{
  "jsonrpc": "2.0",
  "id": "6d484504a27146998768f486aa4a8b41",
  "result": {
    "id": "task-uuid",
    "contextId": "context-uuid",
    "status": {
      "state": "accepted",
      "message": {
        "parts": [{
          "kind": "text",
          "text": "🔄 Analyzing PR now! I'll send results shortly..."
        }],
        "kind": "message",
        "taskId": "task-uuid"
      },
      "progress": 0.1
    },
    "artifacts": [],
    "history": [],
    "kind": "task"
  }
}
Enter fullscreen mode Exit fullscreen mode

Then, after analysis completes, POST to the webhook:

{
  "jsonrpc": "2.0",
  "id": "webhook-rpc-id",
  "result": {
    "id": "task-uuid",
    "contextId": "context-uuid",
    "status": {
      "state": "completed"
    },
    "artifacts": [],
    "history": [],
    "kind": "task"
  }
}
Enter fullscreen mode Exit fullscreen mode

Critical: Webhooks expect a JSON-RPC response (with result), not a request (with method/params). This tripped me up initially!


🏗️ Architecture Overview

The following diagram illustrates the non-blocking flow where the FastAPI application delegates the long-running analysis to a background task and fulfills the result via the Telex webhook.

┌─────────────────────────────────────────────────────────────┐
│                       Telex Platform                        │
│                     (User sends PR URL)                     │
└────────────────────────┬────────────────────────────────────┘
                         │ POST /a2a/tasks (JSON-RPC)
                         │ blocking: false + webhook config
                         ▼
┌─────────────────────────────────────────────────────────────┐
│                 FastAPI Application (Python)                │
│ ┌──────────────────────────────────────────────────────┐  │
│ │   app/routes/a2a_tasks.py                            │  │
│ │   • Receives JSON-RPC requests                       │  │
│ │   • Routes to message handler                        │  │
│ └─────────────────────┬────────────────────────────────┘  │
│                       │                                   │
│ ┌─────────────────────▼────────────────────────────────┐  │
│ │   app/services/message_handler.py                    │  │
│ │   • Parse message & configuration                    │  │
│ │   • Blocking: analyze synchronously                  │  │
│ │   • Non-blocking: return "accepted" + spawn bg task  │  │
│ └─────────────────────┬────────────────────────────────┘  │
│                       │                                   │
│ ┌─────────────────────▼────────────────────────────────┐  │
│ │   app/services/code_analyzer.py                      │  │
│ │   1. Fetch PR from GitHub (via MCP)                  │  │
│ │   2. Rule-based checks (security, performance)       │  │
│ │   3. LLM analysis (Gemini 2.0)                       │  │
│ │     • analyze_security()                             │  │
│ │     • analyze_performance()                          │  │
│ │     • analyze_best_practices()                       │  │
│ │   4. Generate summary                                │  │
│ └─────────────────────┬────────────────────────────────┘  │
│                       │                                   │
│ ┌─────────────────────▼────────────────────────────────┐  │
│ │   app/utils/formatters.py                            │  │
│ │   • Format for Telex UI (emoji, markdown)            │  │
│ │   • Create artifact text                             │  │
│ └──────────────────────────────────────────────────────┘  │
└────────────────────────┬────────────────────────────────────┘
                         │ For non-blocking:
                         │ POST webhook with completed result
                         ▼
┌─────────────────────────────────────────────────────────────┐
│            Telex Webhook (ping.telex.im)                    │
│          Receives completed task, displays to user          │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

🚀 Implementation: FastAPI + Non-Blocking Flow

1. FastAPI Route

The core of the FastAPI integration is setting up a single asynchronous route to receive all A2A traffic and handle the JSON-RPC parsing:

# app/routes/a2a_tasks.py
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse

router = APIRouter()

@router.post("/a2a/tasks")
async def handle_a2a_task(request: Request):
    """A2A endpoint for Telex integration"""
    try:
        payload = await request.json()

        # Extract JSON-RPC fields
        rpc_id = payload.get("id")
        method = payload.get("method")
        params = payload.get("params", {})

        if method == "message/send":
            # Delegate to message handler
            result = await message_handler.handle_message_send(
                message=params.get("message"),
                configuration=params.get("configuration")
            )

            # Return JSON-RPC response
            return JSONResponse({
                "jsonrpc": "2.0",
                "id": rpc_id,
                "result": result
            })
    except Exception as e:
        logger.error(f"Error handling request: {e}")
        return JSONResponse({
            "jsonrpc": "2.0",
            "id": rpc_id,
            "error": {
                "code": -32603,
                "message": str(e)
            }
        }, status_code=500)
Enter fullscreen mode Exit fullscreen mode

2. Message Handler (Non-Blocking Pattern)

Here's the key Python pattern for handling long-running tasks asynchronously using asyncio:

# app/services/message_handler.py
import asyncio
from datetime import datetime, timezone

async def handle_message_send(
    self, 
    message: Dict[str, Any], 
    configuration: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Handle message/send JSON-RPC method"""

    # Parse configuration
    is_blocking = configuration.get("blocking", True) if configuration else True
    push_config = configuration.get("pushNotificationConfig") if configuration else None

    # Extract PR URL from message
    pr_url = self._extract_pr_url(message)

    # Non-blocking mode: return accepted, analyze in background
    if not is_blocking and push_config:
        task_id = str(uuid.uuid4())
        context_id = message.get("contextId", str(uuid.uuid4()))

        # Start background task using Python's asyncio
        asyncio.create_task(
            self._process_and_push_safe(
                pr_url=pr_url,
                task_id=task_id,
                message_id=message.get("messageId"),
                message=message,
                push_config=push_config
            )
        )

        # Return "accepted" immediately
        return {
            "id": task_id,
            "contextId": context_id,
            "status": {
                "state": "accepted",
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "message": {
                    "messageId": str(uuid.uuid4()),
                    "role": "agent",
                    "parts": [{
                        "kind": "text",
                        "text": f"🔄 Analyzing PR! Results coming shortly...\n\n📎 {pr_url}"
                    }],
                    "kind": "message",
                    "taskId": task_id
                },
                "progress": 0.1
            },
            "artifacts": [],
            "history": [],
            "kind": "task"
        }

    # Blocking mode: analyze and return completed task
    return await self._analyze_and_return(pr_url, task_id, message_id, message)
Enter fullscreen mode Exit fullscreen mode

3. Background Task with Webhook Push

The final stage of the non-blocking A2A flow is pushing the result back to Telex via the provided webhook URL, using an asynchronous client like httpx within the Python background task:

async def _process_and_push_safe(
    self,
    pr_url: str,
    task_id: str,
    message_id: str,
    message: Dict[str, Any],
    push_config: Dict[str, Any]
):
    """Analyze PR and push result to Telex webhook"""
    try:
        # Run analysis (30-40 seconds)
        result = await self._analyze_and_return(pr_url, task_id, message_id, message)

        # Build webhook payload (JSON-RPC response format!)
        webhook_payload = {
            "jsonrpc": "2.0",
            "id": str(uuid.uuid4()),
            "result": result  # The completed task
        }

        # Push to Telex webhook
        webhook_url = push_config.get("url")
        token = push_config.get("token")

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}"
        }

        async with httpx.AsyncClient(timeout=60.0) as client:
            response = await client.post(
                webhook_url,
                json=webhook_payload,
                headers=headers
            )
            response.raise_for_status()
            logger.info(f"✅ Webhook push successful: {response.status_code}")

    except Exception as e:
        logger.error(f"❌ Webhook push failed: {e}")
        # Optionally send error to webhook
        await self._push_error_to_telex(task_id, message, push_config, str(e))
Enter fullscreen mode Exit fullscreen mode

🤖 LLM-Powered Analysis

The heart of PRRover is its multi-stage analysis pipeline. This function showcases the high-level steps, integrating both fast, rule-based checks and the deeper, slower LLM analysis.

# app/services/code_analyzer.py
async def analyze_pr(self, pr_url: str) -> CodeAnalysisResult:
    """Analyze a GitHub Pull Request"""

    # Step 1: Fetch PR data from GitHub (via MCP)
    pr_data, diff_content, files = await self._fetch_pr_data(pr_url)

    # Step 2: Parse diff content
    parsed_diffs = DiffParser.parse(diff_content)

    # Step 3: Rule-based checks (fast, synchronous)
    rule_security_issues = SecurityChecker.check(file_changes)
    rule_performance_issues = PerformanceChecker.check(file_changes)

    # Step 4: LLM-powered analysis (comprehensive, slower)
    llm_security_issues = await self.llm_service.analyze_security(
        diff_content, files
    )
    llm_performance_issues = await self.llm_service.analyze_performance(
        diff_content, files
    )
    llm_best_practice_issues = await self.llm_service.analyze_best_practices(
        diff_content, files
    )

    # Step 5: Generate executive summary
    executive_summary = await self.llm_service.generate_summary(
        pr_title=pr_data.title,
        pr_author=pr_data.user.login,
        security_issues=all_security_issues,
        performance_issues=all_performance_issues,
        best_practice_issues=all_best_practice_issues
    )

    # Step 6: Merge findings and calculate risk
    risk_level = self._calculate_risk_level(
        all_security_issues, 
        all_performance_issues
    )

    return CodeAnalysisResult(...)
Enter fullscreen mode Exit fullscreen mode

Why hybrid analysis?

  • Rule-based: Fast pattern matching for common issues (hardcoded secrets, SQL injection patterns).
  • LLM-powered: Deep semantic understanding, context-aware recommendations, executive summaries.

LLM Model: Google Gemini 2.0 Flash Lite

Typical latency: 30-40 seconds for a complete analysis

This is exactly why non-blocking mode matters—users get instant acknowledgement while the AI does its work.


📸 PRRover in Action: Non-Blocking Success

The core benefit of the non-blocking A2A architecture is the user experience: giving instant acknowledgement while the heavy lifting (the 30-40 second LLM analysis) happens safely in the background.

In the example above, the agent immediately responds with an acknowledgement message: "Analyzing the Pull Request for security vulnerabilities and performance issues! Please hold on a moment."

Once the full analysis of the PR #11:Feature/atomic property image upload is complete, the background task uses the webhook to push the comprehensive report directly into the Telex chat as a rich artifact. This provides a polished, non-disruptive, and high-fidelity result, ensuring the user is never left waiting for a timeout.


🧪 Testing Your A2A Integration

I built a comprehensive test suite to validate A2A compliance:

# Install dependencies
pip install httpx

# Run tests
python scripts/test_a2a_agent.py https://your-agent-url.com
Enter fullscreen mode Exit fullscreen mode

The test suite validates:

  • Agent Card: Presence, format, required fields
  • Endpoint Accessibility: /a2a/tasks accepts POST
  • JSON-RPC Format: Valid jsonrpc: "2.0", result or error
  • Task Structure: Correct id, status, kind fields
  • Blocking Mode: Returns completed state
  • Non-Blocking Mode: Returns accepted state
  • Artifacts: Proper format with kind: "text"

Example output:

============================================================
📊 **GRADING REPORT**
============================================================
🎯 Score: 22/28 (78.6%)
✅ Passed: 11/14
❌ Failed: 3/14
⏱️  Execution Time: 10.23s
============================================================
Enter fullscreen mode Exit fullscreen mode

PowerShell users can use the convenient runner:

.\scripts\run_tests.ps1 https://your-agent-url.com
Enter fullscreen mode Exit fullscreen mode

🐛 Troubleshooting: Lessons Learned

1. Webhook Format Confusion

Problem: Webhook returns 400, Telex shows "could not find message"

Solution: Telex expects a JSON-RPC response format for webhooks.

// ❌ WRONG (request format)
{
  "jsonrpc": "2.0",
  "method": "message/send",
  "params": { ... }
}

// ✅ CORRECT (response format)
{
  "jsonrpc": "2.0",
  "id": "rpc-id",
  "result": { /* task */ }
}
Enter fullscreen mode Exit fullscreen mode

2. Blocking Request Timeouts

Problem: Long LLM analyses (30s+) timeout in blocking mode

Solution: Use non-blocking mode for analyses greater than 10 seconds. Return "accepted" immediately, then push results via webhook using Python's background task facilities.

3. Webhook Authentication

Problem: Webhook POST returns 401 Unauthorized

Solution: Check pushNotificationConfig.authentication.schemes and set the appropriate header:

auth_schemes = push_config.get("authentication", {}).get("schemes", [])
if "Bearer" in auth_schemes:
    headers["Authorization"] = f"Bearer {token}"
Enter fullscreen mode Exit fullscreen mode

4. Debugging Webhook Payloads

Add clear markers to logs for easy extraction:

logger.info("WEBHOOK_PAYLOAD_JSON_START")
logger.info(json.dumps(webhook_payload, indent=2))
logger.info("WEBHOOK_PAYLOAD_JSON_END")
Enter fullscreen mode Exit fullscreen mode

Then you can copy/paste and test with curl:

curl -X POST "https://ping.telex.im/v1/a2a/webhooks/..." \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d @webhook_payload.json
Enter fullscreen mode Exit fullscreen mode

📈 Results & Impact

What works:

  • ✅ Instant acknowledgement for long-running tasks (< 1s response)
  • ✅ Comprehensive LLM-powered analysis (security, performance, best practices)
  • ✅ Rich artifacts with CWE references and actionable recommendations
  • ✅ 78.6% test pass rate (11/14 tests passing)
  • ✅ Production-ready on Render.com

Sample Analysis Output:

✅ Code Review Complete - PR #123

Feature/atomic property image upload by @SamuelOshin

🟢 Risk: LOW | ✅ Approve

📊 16 files changed (+194/-59) • 20 issues found

Issues Found:
🔒 6 Security issues
⚡ 7 Performance issues
📐 7 Best Practice violations

💡 Top Concerns:
1. 📐 BEST PRACTICES: Focus on code organization (3 issues)

[Full detailed artifact with line numbers, CWE IDs, and fix suggestions]
Enter fullscreen mode Exit fullscreen mode

🚀 Next Steps & Scaling

Immediate improvements:

  • [ ] Move LLM calls to Celery/RQ worker queue for horizontal scaling
  • [ ] Add Redis caching for PR diffs and LLM responses
  • [ ] Implement webhook retry with exponential backoff
  • [ ] Add structured logging (JSON) for better observability
  • [ ] Support multiple LLM providers (OpenAI, Anthropic)

Feature ideas:

  • [ ] Support review comments directly on GitHub
  • [ ] Diff-level suggestions (not just file-level)
  • [ ] Team-specific rule customization
  • [ ] Historical trend analysis ("security issues down 40% this month")

🎓 Key Takeaways

  1. A2A uses JSON-RPC 2.0 — understand the request/response format differences.
  2. Non-blocking + webhooks = better UX — users don't wait for slow operations, thanks to Python's asyncio capabilities.
  3. Webhook payloads are responses, not requests — this is the #1 gotcha.
  4. Test everything — build automated tests early to catch protocol issues.
  5. Log everything — especially webhook payloads for debugging.

Resources:


💬 Let's Connect

Building agents? Hit me up! I'd love to hear about your A2A integrations or help debug webhook issues 😄

Questions? Drop them in the comments below! 👇

Happy coding! 🚀

Top comments (0)