DEV Community

dohko
dohko

Posted on

5 AI-Powered Code Review Pipelines You Can Build This Weekend

Your Code Reviews Are a Bottleneck

Every engineering team has the same problem: PRs sit in review queues for hours (or days). Senior engineers spend 30% of their time reviewing code. And half the feedback is stuff a machine could catch.

AI code review isn't about replacing human reviewers. It's about letting humans focus on architecture and design while AI handles the mechanical stuff.

Here are 5 pipelines you can build this weekend, starting from dead-simple and scaling up.


Pipeline 1: The PR Diff Analyzer

The simplest useful pipeline. Feed a PR diff to an LLM and get structured feedback.

import json
import subprocess
from dataclasses import dataclass, field
from typing import Any

@dataclass
class ReviewComment:
    """A single review comment on a code change."""
    file: str
    line: int
    severity: str  # "critical", "warning", "suggestion", "nitpick"
    category: str  # "bug", "security", "performance", "style", "logic"
    message: str
    suggested_fix: str | None = None

@dataclass
class ReviewResult:
    """Complete review result for a PR."""
    pr_number: int
    total_files: int
    total_comments: int
    critical_count: int
    comments: list[ReviewComment] = field(default_factory=list)
    summary: str = ""
    approve: bool = True

class PRDiffAnalyzer:
    """
    Analyze git diffs and produce structured code review feedback.
    Works with any LLM that supports chat completions.
    """

    REVIEW_PROMPT = """You are an expert code reviewer. Analyze this git diff and provide structured feedback.

## Diff
Enter fullscreen mode Exit fullscreen mode

{diff}


## Review Guidelines
- Focus on bugs, security issues, and logic errors
- Flag performance problems with concrete impact
- Note missing error handling
- Suggest improvements, not just problems
- Be specific: reference file names and line numbers

## Output Format
Respond with a JSON object:
{{
    "comments": [
        {{
            "file": "path/to/file.py",
            "line": 42,
            "severity": "critical|warning|suggestion|nitpick",
            "category": "bug|security|performance|style|logic",
            "message": "Description of the issue",
            "suggested_fix": "Code or description of fix (optional)"
        }}
    ],
    "summary": "One paragraph overall assessment",
    "approve": true/false
}}"""

    def __init__(self, llm_client: Any = None, model: str = "gpt-4o"):
        self.llm_client = llm_client
        self.model = model

    def get_diff(
        self, base_branch: str = "main", head_branch: str = "HEAD"
    ) -> str:
        """Get git diff between two branches."""
        result = subprocess.run(
            ["git", "diff", f"{base_branch}...{head_branch}"],
            capture_output=True,
            text=True,
        )
        return result.stdout

    def get_pr_diff(self, pr_number: int) -> str:
        """Get diff for a GitHub PR using the CLI."""
        result = subprocess.run(
            ["gh", "pr", "diff", str(pr_number)],
            capture_output=True,
            text=True,
        )
        return result.stdout

    def _chunk_diff(self, diff: str, max_chars: int = 12000) -> list[str]:
        """Split large diffs into reviewable chunks."""
        files = diff.split("diff --git ")
        chunks = []
        current_chunk = ""

        for file_diff in files:
            if not file_diff.strip():
                continue
            if len(current_chunk) + len(file_diff) > max_chars:
                if current_chunk:
                    chunks.append(current_chunk)
                current_chunk = file_diff
            else:
                current_chunk += f"\ndiff --git {file_diff}"

        if current_chunk:
            chunks.append(current_chunk)

        return chunks

    def _call_llm(self, prompt: str) -> str:
        """Call LLM for review. Override or inject your client."""
        import httpx
        import os

        response = httpx.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
            json={
                "model": self.model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": 0.1,
                "response_format": {"type": "json_object"},
            },
            timeout=60,
        )
        return response.json()["choices"][0]["message"]["content"]

    def review_diff(self, diff: str) -> ReviewResult:
        """Review a complete diff."""
        chunks = self._chunk_diff(diff)
        all_comments = []
        summaries = []

        for chunk in chunks:
            prompt = self.REVIEW_PROMPT.format(diff=chunk)
            raw = self._call_llm(prompt)
            parsed = json.loads(raw)

            for c in parsed.get("comments", []):
                all_comments.append(ReviewComment(**c))
            summaries.append(parsed.get("summary", ""))

        critical_count = sum(
            1 for c in all_comments if c.severity == "critical"
        )

        return ReviewResult(
            pr_number=0,
            total_files=len(chunks),
            total_comments=len(all_comments),
            critical_count=critical_count,
            comments=all_comments,
            summary=" ".join(summaries),
            approve=critical_count == 0,
        )


# --- Usage ---

analyzer = PRDiffAnalyzer(model="gpt-4o")

# Review current branch against main
# diff = analyzer.get_diff("main", "HEAD")
# result = analyzer.review_diff(diff)
# print(f"Comments: {result.total_comments}, Critical: {result.critical_count}")
# print(f"Approve: {'' if result.approve else ''}")
Enter fullscreen mode Exit fullscreen mode

Cost: ~$0.02-0.10 per PR depending on diff size.


Pipeline 2: Security-Focused Scanner

A specialized pipeline that focuses exclusively on security vulnerabilities.

import re
import json
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any

class VulnSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class SecurityFinding:
    """A security vulnerability finding."""
    file: str
    line: int
    severity: VulnSeverity
    cwe_id: str
    title: str
    description: str
    remediation: str
    confidence: float  # 0.0 to 1.0

    @property
    def is_blocking(self) -> bool:
        return self.severity in (VulnSeverity.CRITICAL, VulnSeverity.HIGH)

class SecurityScanner:
    """
    Combines static pattern matching with LLM-powered
    contextual security analysis.
    """

    # Static patterns for common vulnerabilities
    VULN_PATTERNS = {
        "sql_injection": {
            "pattern": r"""(?x)
                (?:execute|cursor\.execute|raw|RawSQL)\s*\(
                \s*f['""]  |
                (?:execute|cursor\.execute)\s*\(\s*
                ['""].*?%s.*?['""].*?%  |
                \.format\s*\(.*?\).*?(?:execute|query)
            """,
            "cwe": "CWE-89",
            "severity": VulnSeverity.CRITICAL,
            "title": "Potential SQL Injection",
        },
        "hardcoded_secret": {
            "pattern": r"""(?x)
                (?:password|secret|token|api_key|apikey|
                   private_key|auth_token)\s*=\s*
                ['""][^'""]{8,}['""]
            """,
            "cwe": "CWE-798",
            "severity": VulnSeverity.HIGH,
            "title": "Hardcoded Secret",
        },
        "command_injection": {
            "pattern": r"""(?x)
                (?:os\.system|os\.popen|subprocess\.call|
                   subprocess\.Popen|subprocess\.run)\s*\(
                \s*(?:f['""]|.*?\.format|.*?\+\s*(?!['""]))
            """,
            "cwe": "CWE-78",
            "severity": VulnSeverity.CRITICAL,
            "title": "Potential Command Injection",
        },
        "path_traversal": {
            "pattern": r"""(?x)
                (?:open|Path)\s*\(\s*
                (?:request\.|user_input|params|
                   .*?\+\s*(?:request|input))
            """,
            "cwe": "CWE-22",
            "severity": VulnSeverity.HIGH,
            "title": "Potential Path Traversal",
        },
        "insecure_deserialization": {
            "pattern": r"""(?x)
                (?:pickle\.loads?|yaml\.(?:load|unsafe_load)|
                   marshal\.loads?)\s*\(
            """,
            "cwe": "CWE-502",
            "severity": VulnSeverity.HIGH,
            "title": "Insecure Deserialization",
        },
        "weak_crypto": {
            "pattern": r"""(?x)
                (?:hashlib\.(?:md5|sha1)|
                   DES\.|Blowfish|RC4|
                   random\.(?:random|randint|choice))
            """,
            "cwe": "CWE-327",
            "severity": VulnSeverity.MEDIUM,
            "title": "Weak Cryptography",
        },
        "ssrf": {
            "pattern": r"""(?x)
                (?:requests\.(?:get|post|put|delete|patch)|
                   httpx\.(?:get|post|put)|
                   urllib\.request\.urlopen)\s*\(
                \s*(?:f['""]|.*?\.format|.*?\+)
            """,
            "cwe": "CWE-918",
            "severity": VulnSeverity.HIGH,
            "title": "Potential SSRF",
        },
    }

    def __init__(self):
        self.findings: list[SecurityFinding] = []

    def scan_file(self, file_path: str, content: str) -> list[SecurityFinding]:
        """Scan a single file for vulnerabilities."""
        findings = []
        lines = content.split("\n")

        for vuln_name, config in self.VULN_PATTERNS.items():
            for i, line in enumerate(lines, 1):
                # Skip comments
                stripped = line.strip()
                if stripped.startswith("#") or stripped.startswith("//"):
                    continue

                if re.search(config["pattern"], line, re.IGNORECASE):
                    finding = SecurityFinding(
                        file=file_path,
                        line=i,
                        severity=config["severity"],
                        cwe_id=config["cwe"],
                        title=config["title"],
                        description=f"Found {vuln_name} pattern in: {line.strip()[:100]}",
                        remediation=self._get_remediation(vuln_name),
                        confidence=0.7,
                    )
                    findings.append(finding)

        self.findings.extend(findings)
        return findings

    def scan_directory(
        self,
        directory: str,
        extensions: list[str] | None = None,
    ) -> list[SecurityFinding]:
        """Scan all files in a directory."""
        extensions = extensions or [".py", ".js", ".ts", ".go", ".rb", ".java"]
        all_findings = []

        for path in Path(directory).rglob("*"):
            if path.suffix in extensions and path.is_file():
                content = path.read_text(errors="ignore")
                findings = self.scan_file(str(path), content)
                all_findings.extend(findings)

        return all_findings

    def scan_diff(self, diff: str) -> list[SecurityFinding]:
        """Scan only the added lines in a diff."""
        findings = []
        current_file = ""
        current_line = 0

        for line in diff.split("\n"):
            if line.startswith("+++ b/"):
                current_file = line[6:]
            elif line.startswith("@@ "):
                # Parse line number from hunk header
                match = re.search(r"\+(\d+)", line)
                if match:
                    current_line = int(match.group(1)) - 1
            elif line.startswith("+") and not line.startswith("+++"):
                current_line += 1
                added_line = line[1:]

                for vuln_name, config in self.VULN_PATTERNS.items():
                    if re.search(
                        config["pattern"], added_line, re.IGNORECASE
                    ):
                        findings.append(
                            SecurityFinding(
                                file=current_file,
                                line=current_line,
                                severity=config["severity"],
                                cwe_id=config["cwe"],
                                title=config["title"],
                                description=(
                                    f"New code introduces {vuln_name}: "
                                    f"{added_line.strip()[:100]}"
                                ),
                                remediation=self._get_remediation(vuln_name),
                                confidence=0.7,
                            )
                        )
            elif not line.startswith("-"):
                current_line += 1

        self.findings.extend(findings)
        return findings

    def _get_remediation(self, vuln_name: str) -> str:
        remediations = {
            "sql_injection": "Use parameterized queries or an ORM. Never interpolate user input into SQL strings.",
            "hardcoded_secret": "Move secrets to environment variables or a secrets manager (AWS Secrets Manager, Vault, etc).",
            "command_injection": "Use subprocess with a list of arguments instead of shell=True. Validate and sanitize all inputs.",
            "path_traversal": "Use pathlib.resolve() and check the resolved path starts with your allowed base directory.",
            "insecure_deserialization": "Use json.loads() instead of pickle/yaml.load(). If pickle is necessary, use hmac verification.",
            "weak_crypto": "Use hashlib.sha256 or bcrypt for passwords. Use secrets.token_hex() for random tokens.",
            "ssrf": "Validate URLs against an allowlist. Block internal/private IP ranges. Use a URL parser to verify the hostname.",
        }
        return remediations.get(vuln_name, "Review and fix the identified vulnerability.")

    def get_report(self) -> dict:
        blocking = [f for f in self.findings if f.is_blocking]
        return {
            "total_findings": len(self.findings),
            "blocking": len(blocking),
            "by_severity": {
                s.value: sum(1 for f in self.findings if f.severity == s)
                for s in VulnSeverity
            },
            "should_block": len(blocking) > 0,
            "findings": [
                {
                    "file": f.file,
                    "line": f.line,
                    "severity": f.severity.value,
                    "cwe": f.cwe_id,
                    "title": f.title,
                    "description": f.description,
                    "remediation": f.remediation,
                }
                for f in sorted(
                    self.findings,
                    key=lambda x: list(VulnSeverity).index(x.severity),
                )
            ],
        }


# --- Usage ---

scanner = SecurityScanner()

# Scan a diff
# diff = subprocess.run(["git", "diff", "main...HEAD"], capture_output=True, text=True).stdout
# findings = scanner.scan_diff(diff)

# Or scan a directory
# findings = scanner.scan_directory("./src")

# Get report
# report = scanner.get_report()
# if report["should_block"]:
#     print(f"🚨 {report['blocking']} blocking security issues found!")
#     sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

Pipeline 3: The Multi-Pass Reviewer

Different aspects of code need different review strategies. This pipeline runs multiple specialized passes.

import json
from dataclasses import dataclass, field
from typing import Any, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed

@dataclass
class ReviewPass:
    """A single specialized review pass."""
    name: str
    system_prompt: str
    focus_areas: list[str]
    severity_weight: float = 1.0

@dataclass
class MultiPassResult:
    """Combined results from all review passes."""
    passes_completed: int
    total_comments: int
    weighted_score: float
    comments_by_pass: dict[str, list[dict]]
    summary: str
    decision: str  # "approve", "request_changes", "block"

class MultiPassReviewer:
    """
    Run multiple specialized review passes over the same code.
    Each pass focuses on a different quality dimension.
    """

    DEFAULT_PASSES = [
        ReviewPass(
            name="correctness",
            system_prompt="""You are a correctness-focused code reviewer.
Look ONLY for:
- Logic errors and bugs
- Off-by-one errors
- Null/undefined handling
- Race conditions
- Edge cases not handled
Do NOT comment on style, naming, or performance.""",
            focus_areas=["bugs", "logic", "edge_cases"],
            severity_weight=3.0,
        ),
        ReviewPass(
            name="security",
            system_prompt="""You are a security-focused code reviewer.
Look ONLY for:
- Injection vulnerabilities (SQL, command, XSS)
- Authentication/authorization flaws
- Data exposure risks
- Insecure dependencies
- Missing input validation
Do NOT comment on style or performance.""",
            focus_areas=["security", "auth", "injection"],
            severity_weight=5.0,
        ),
        ReviewPass(
            name="performance",
            system_prompt="""You are a performance-focused code reviewer.
Look ONLY for:
- O(n²) or worse algorithms where O(n) exists
- Unnecessary database queries (N+1)
- Memory leaks or excessive allocations
- Missing caching opportunities
- Blocking operations in async contexts
Do NOT comment on style or correctness.""",
            focus_areas=["performance", "scalability", "efficiency"],
            severity_weight=2.0,
        ),
        ReviewPass(
            name="maintainability",
            system_prompt="""You are a maintainability-focused code reviewer.
Look ONLY for:
- Functions that are too long (>50 lines)
- Missing or misleading documentation
- Poor naming that obscures intent
- Duplicated logic that should be extracted
- Missing type hints on public APIs
Be brief. Only comment on significant issues.""",
            focus_areas=["readability", "documentation", "structure"],
            severity_weight=1.0,
        ),
    ]

    PASS_PROMPT_TEMPLATE = """{system_prompt}

## Code to Review
Enter fullscreen mode Exit fullscreen mode

{code}


## Output Format
Return JSON:
{{
    "comments": [
        {{
            "file": "filename",
            "line": 0,
            "issue": "description",
            "severity": "critical|warning|suggestion",
            "fix": "suggested fix (optional)"
        }}
    ],
    "pass_score": <1-10>,
    "summary": "one sentence"
}}"""

    def __init__(
        self,
        passes: list[ReviewPass] | None = None,
        llm_fn: Callable[[str], str] | None = None,
        parallel: bool = True,
    ):
        self.passes = passes or self.DEFAULT_PASSES
        self.llm_fn = llm_fn or self._default_llm
        self.parallel = parallel

    def _default_llm(self, prompt: str) -> str:
        """Default LLM call  replace with your implementation."""
        import httpx
        import os

        response = httpx.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
            json={
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
                "temperature": 0.1,
                "response_format": {"type": "json_object"},
            },
            timeout=60,
        )
        return response.json()["choices"][0]["message"]["content"]

    def _run_single_pass(
        self, review_pass: ReviewPass, code: str
    ) -> tuple[str, dict]:
        """Run a single review pass."""
        prompt = self.PASS_PROMPT_TEMPLATE.format(
            system_prompt=review_pass.system_prompt,
            code=code,
        )
        raw = self.llm_fn(prompt)
        parsed = json.loads(raw)
        return review_pass.name, parsed

    def review(self, code: str) -> MultiPassResult:
        """Run all review passes and combine results."""
        pass_results: dict[str, dict] = {}

        if self.parallel:
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = {
                    executor.submit(
                        self._run_single_pass, rp, code
                    ): rp.name
                    for rp in self.passes
                }
                for future in as_completed(futures):
                    name, result = future.result()
                    pass_results[name] = result
        else:
            for rp in self.passes:
                name, result = self._run_single_pass(rp, code)
                pass_results[name] = result

        # Combine and score
        all_comments = {}
        total_comments = 0
        weighted_scores = []

        for rp in self.passes:
            name = rp.name
            if name in pass_results:
                result = pass_results[name]
                comments = result.get("comments", [])
                all_comments[name] = comments
                total_comments += len(comments)
                score = result.get("pass_score", 5)
                weighted_scores.append(score * rp.severity_weight)

        # Calculate weighted average
        total_weight = sum(rp.severity_weight for rp in self.passes)
        weighted_avg = (
            sum(weighted_scores) / total_weight if total_weight else 5.0
        )

        # Decision logic
        has_critical = any(
            c.get("severity") == "critical"
            for comments in all_comments.values()
            for c in comments
        )

        if has_critical:
            decision = "block"
        elif weighted_avg < 5.0:
            decision = "request_changes"
        else:
            decision = "approve"

        summaries = [
            f"**{name}**: {pass_results[name].get('summary', 'N/A')}"
            for name in pass_results
        ]

        return MultiPassResult(
            passes_completed=len(pass_results),
            total_comments=total_comments,
            weighted_score=round(weighted_avg, 1),
            comments_by_pass=all_comments,
            summary="\n".join(summaries),
            decision=decision,
        )


# --- Usage ---

# reviewer = MultiPassReviewer(parallel=True)
# result = reviewer.review(code_to_review)
# print(f"Decision: {result.decision}")
# print(f"Score: {result.weighted_score}/10")
# print(f"Total comments: {result.total_comments}")
Enter fullscreen mode Exit fullscreen mode

Why multi-pass? A single prompt asking for "everything" produces shallow reviews. Specialized passes go deeper on each dimension.


Pipeline 4: The GitHub Actions Integration

Wire your review pipeline into CI so every PR gets reviewed automatically.

# .github/workflows/ai-review.yml
name: AI Code Review

on:
  pull_request:
    types: [opened, synchronize]

permissions:
  contents: read
  pull-requests: write

jobs:
  ai-review:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install httpx

      - name: Run AI Review
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
          PR_NUMBER: ${{ github.event.pull_request.number }}
        run: python scripts/ai_review.py
Enter fullscreen mode Exit fullscreen mode

And the script it calls:

#!/usr/bin/env python3
"""
AI Code Review script for GitHub Actions.
Posts review comments directly on the PR.
"""

import json
import os
import subprocess
import sys
from dataclasses import dataclass

@dataclass
class PRComment:
    path: str
    line: int
    body: str
    side: str = "RIGHT"

class GitHubReviewer:
    """Post AI review comments to GitHub PRs."""

    def __init__(self):
        self.token = os.environ["GH_TOKEN"]
        self.pr_number = int(os.environ["PR_NUMBER"])
        self.repo = os.environ.get("GITHUB_REPOSITORY", "")

    def get_diff(self) -> str:
        result = subprocess.run(
            ["git", "diff", "origin/main...HEAD"],
            capture_output=True,
            text=True,
        )
        return result.stdout

    def get_changed_files(self) -> list[str]:
        result = subprocess.run(
            ["git", "diff", "--name-only", "origin/main...HEAD"],
            capture_output=True,
            text=True,
        )
        return [f for f in result.stdout.strip().split("\n") if f]

    def review_with_llm(self, diff: str) -> list[dict]:
        """Send diff to LLM and get review comments."""
        import httpx

        prompt = f"""Review this git diff. Return JSON array of comments.
Each comment: {{"path": "file.py", "line": 42, "body": "issue description", "severity": "critical|warning|suggestion"}}
Only include actionable, specific feedback. No generic praise.

Enter fullscreen mode Exit fullscreen mode


diff
{diff[:15000]}


        response = httpx.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
            json={
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
                "temperature": 0.1,
                "response_format": {"type": "json_object"},
            },
            timeout=90,
        )

        result = response.json()["choices"][0]["message"]["content"]
        parsed = json.loads(result)
        return parsed.get("comments", parsed) if isinstance(parsed, dict) else parsed

    def post_review(self, comments: list[dict]):
        """Post a review with inline comments to the PR."""
        import httpx

        # Build PR review comments
        pr_comments = []
        for c in comments:
            severity_emoji = {
                "critical": "🚨",
                "warning": "⚠️",
                "suggestion": "💡",
            }.get(c.get("severity", "suggestion"), "💡")

            pr_comments.append({
                "path": c["path"],
                "line": c.get("line", 1),
                "body": f"{severity_emoji} **{c.get('severity', 'suggestion').upper()}**\n\n{c['body']}",
                "side": "RIGHT",
            })

        if not pr_comments:
            # Post approval
            body = {
                "event": "APPROVE",
                "body": "✅ AI Review: No issues found. LGTM!",
            }
        else:
            critical = sum(
                1 for c in comments
                if c.get("severity") == "critical"
            )
            body = {
                "event": "REQUEST_CHANGES" if critical > 0 else "COMMENT",
                "body": (
                    f"🤖 **AI Code Review**\n\n"
                    f"Found {len(comments)} issue(s) "
                    f"({critical} critical)\n\n"
                    f"---\n*Automated review by AI pipeline*"
                ),
                "comments": pr_comments,
            }

        response = httpx.post(
            f"https://api.github.com/repos/{self.repo}"
            f"/pulls/{self.pr_number}/reviews",
            headers={
                "Authorization": f"Bearer {self.token}",
                "Accept": "application/vnd.github.v3+json",
            },
            json=body,
            timeout=30,
        )

        if response.status_code in (200, 201):
            print(f"✅ Review posted: {len(pr_comments)} comments")
        else:
            print(f"❌ Failed to post review: {response.status_code}")
            print(response.text)
            sys.exit(1)


def main():
    reviewer = GitHubReviewer()

    print("📥 Getting diff...")
    diff = reviewer.get_diff()

    if not diff.strip():
        print("No changes to review")
        return

    print(f"📝 Reviewing {len(diff)} chars of diff...")
    comments = reviewer.review_with_llm(diff)

    print(f"📤 Posting {len(comments)} comments...")
    reviewer.post_review(comments)


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Setup time: 15 minutes. Copy the files, add your API key as a GitHub secret, done.


Pipeline 5: The Multi-Agent Review Board

The most sophisticated pipeline. Multiple specialized AI "reviewers" collaborate, debate, and produce a consensus review.

import json
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum

class ReviewerRole(Enum):
    ARCHITECT = "architect"
    SECURITY = "security"
    PERFORMANCE = "performance"
    DX = "developer_experience"
    MODERATOR = "moderator"

@dataclass
class ReviewerAgent:
    """A specialized code reviewer agent."""
    role: ReviewerRole
    name: str
    system_prompt: str
    temperature: float = 0.2
    model: str = "gpt-4o"

@dataclass
class ReviewDebateRound:
    """One round of the review debate."""
    round_number: int
    contributions: dict[str, str]  # role -> comment

@dataclass
class ConsensusReview:
    """Final consensus from the review board."""
    decision: str  # approve, request_changes, block
    confidence: float
    key_findings: list[dict]
    debate_rounds: list[ReviewDebateRound]
    dissenting_opinions: list[str]
    final_summary: str

class ReviewBoard:
    """
    Multi-agent code review system.
    Specialized agents review code, then debate
    and reach consensus through a moderator.
    """

    DEFAULT_AGENTS = [
        ReviewerAgent(
            role=ReviewerRole.ARCHITECT,
            name="Alex (Architecture)",
            system_prompt="""You are Alex, a senior software architect.
You focus on:
- System design implications of code changes
- API contracts and backward compatibility
- Dependency management and coupling
- Scalability patterns
Speak concisely. Back claims with specifics.""",
        ),
        ReviewerAgent(
            role=ReviewerRole.SECURITY,
            name="Sam (Security)",
            system_prompt="""You are Sam, a security engineer.
You focus on:
- OWASP Top 10 vulnerabilities
- Authentication and authorization flaws
- Data exposure and privacy
- Supply chain security
You are cautious. Flag anything suspicious.""",
        ),
        ReviewerAgent(
            role=ReviewerRole.PERFORMANCE,
            name="Pat (Performance)",
            system_prompt="""You are Pat, a performance engineer.
You focus on:
- Algorithmic complexity
- Database query patterns
- Memory usage and leaks
- Caching opportunities
- Concurrency issues
Only flag measurable performance impacts.""",
        ),
        ReviewerAgent(
            role=ReviewerRole.DX,
            name="Dana (Developer Experience)",
            system_prompt="""You are Dana, a developer experience advocate.
You focus on:
- Code readability and clarity
- Documentation quality
- Test coverage
- Error messages and debugging
- Onboarding-friendliness
You represent future developers reading this code.""",
        ),
    ]

    MODERATOR = ReviewerAgent(
        role=ReviewerRole.MODERATOR,
        name="Morgan (Moderator)",
        system_prompt="""You are Morgan, the review board moderator.
Your job:
1. Synthesize feedback from all reviewers
2. Resolve disagreements with reasoning
3. Produce a final, actionable review
4. Make the approve/block/request_changes decision

Be fair. Weight security concerns highest.
Note any dissenting opinions.""",
        model="gpt-4o",
    )

    INDIVIDUAL_REVIEW_PROMPT = """## Your Review

Review this code change from your expert perspective.

Enter fullscreen mode Exit fullscreen mode

{code}


Previous reviewers said:
{previous_reviews}

Provide your unique perspective. Don't repeat what others said.
Return JSON:
{{
    "findings": [
        {{
            "severity": "critical|high|medium|low",
            "title": "short title",
            "description": "detailed explanation",
            "file": "filename (if applicable)",
            "line": 0,
            "suggested_fix": "optional"
        }}
    ],
    "vote": "approve|request_changes|block",
    "confidence": 0.0-1.0,
    "summary": "one paragraph"
}}"""

    CONSENSUS_PROMPT = """## Build Consensus

You are the moderator. Here are all reviews:

{all_reviews}

Code under review:
Enter fullscreen mode Exit fullscreen mode

{code}


Synthesize into a final review. Resolve disagreements.
Return JSON:
{{
    "decision": "approve|request_changes|block",
    "confidence": 0.0-1.0,
    "key_findings": [
        {{
            "severity": "critical|high|medium|low",
            "title": "finding title",
            "description": "synthesized description",
            "agreed_by": ["reviewer names"],
            "remediation": "how to fix"
        }}
    ],
    "dissenting_opinions": ["any unresolved disagreements"],
    "summary": "final assessment paragraph"
}}"""

    def __init__(
        self,
        agents: list[ReviewerAgent] | None = None,
        llm_fn: Callable[[str, str, float], str] | None = None,
    ):
        self.agents = agents or self.DEFAULT_AGENTS
        self.moderator = self.MODERATOR
        self.llm_fn = llm_fn or self._default_llm

    def _default_llm(
        self, prompt: str, model: str = "gpt-4o", temp: float = 0.2
    ) -> str:
        import httpx
        import os

        response = httpx.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
                "Content-Type": "application/json",
            },
            json={
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": temp,
                "response_format": {"type": "json_object"},
            },
            timeout=90,
        )
        return response.json()["choices"][0]["message"]["content"]

    def _individual_review(
        self,
        agent: ReviewerAgent,
        code: str,
        previous: str = "None yet — you're first.",
    ) -> dict:
        """Get a review from a single agent."""
        prompt = (
            f"{agent.system_prompt}\n\n"
            + self.INDIVIDUAL_REVIEW_PROMPT.format(
                code=code, previous_reviews=previous
            )
        )
        raw = self.llm_fn(prompt, agent.model, agent.temperature)
        result = json.loads(raw)
        result["reviewer"] = agent.name
        return result

    def review(self, code: str, rounds: int = 1) -> ConsensusReview:
        """
        Run the full review board process.
        1. Each agent reviews independently
        2. Optional debate rounds
        3. Moderator synthesizes consensus
        """
        debate_log: list[ReviewDebateRound] = []

        # Round 1: Independent reviews
        print("🔍 Round 1: Independent reviews...")
        reviews = {}
        previous = "None yet — you're first."

        for agent in self.agents:
            print(f"  📝 {agent.name} reviewing...")
            result = self._individual_review(agent, code, previous)
            reviews[agent.name] = result
            # Each subsequent reviewer sees previous reviews
            previous = json.dumps(
                {name: r.get("summary", "") for name, r in reviews.items()},
                indent=2,
            )

        debate_log.append(
            ReviewDebateRound(
                round_number=1,
                contributions={
                    name: r.get("summary", "")
                    for name, r in reviews.items()
                },
            )
        )

        # Additional debate rounds (optional)
        for round_num in range(2, rounds + 1):
            print(f"💬 Round {round_num}: Debate...")
            round_contributions = {}
            for agent in self.agents:
                result = self._individual_review(
                    agent, code, json.dumps(reviews, indent=2)
                )
                reviews[agent.name] = result
                round_contributions[agent.name] = result.get("summary", "")

            debate_log.append(
                ReviewDebateRound(
                    round_number=round_num,
                    contributions=round_contributions,
                )
            )

        # Moderator consensus
        print("⚖️  Moderator building consensus...")
        all_reviews_str = json.dumps(reviews, indent=2)
        consensus_prompt = (
            f"{self.moderator.system_prompt}\n\n"
            + self.CONSENSUS_PROMPT.format(
                all_reviews=all_reviews_str, code=code
            )
        )
        raw = self.llm_fn(
            consensus_prompt,
            self.moderator.model,
            self.moderator.temperature,
        )
        consensus = json.loads(raw)

        return ConsensusReview(
            decision=consensus["decision"],
            confidence=consensus.get("confidence", 0.5),
            key_findings=consensus.get("key_findings", []),
            debate_rounds=debate_log,
            dissenting_opinions=consensus.get("dissenting_opinions", []),
            final_summary=consensus.get("summary", ""),
        )


# --- Usage ---

def run_review_board():
    board = ReviewBoard()

    # Read the code to review
    import subprocess
    diff = subprocess.run(
        ["git", "diff", "main...HEAD"],
        capture_output=True, text=True,
    ).stdout

    if not diff:
        print("No changes to review")
        return

    result = board.review(diff, rounds=1)

    print("\n" + "=" * 60)
    print(f"📊 REVIEW BOARD DECISION: {result.decision.upper()}")
    print(f"   Confidence: {result.confidence:.0%}")
    print("=" * 60)
    print(f"\n{result.final_summary}")

    if result.key_findings:
        print(f"\n📋 Key Findings ({len(result.key_findings)}):")
        for f in result.key_findings:
            emoji = {"critical": "🚨", "high": "🔴", "medium": "🟡", "low": "🟢"}.get(
                f.get("severity", "low"), ""
            )
            print(f"  {emoji} [{f['severity']}] {f['title']}")
            print(f"     {f['description'][:100]}")

    if result.dissenting_opinions:
        print(f"\n⚡ Dissenting opinions:")
        for opinion in result.dissenting_opinions:
            print(f"  - {opinion}")


if __name__ == "__main__":
    run_review_board()
Enter fullscreen mode Exit fullscreen mode

Cost: ~$0.20-0.50 per review (4 agents + moderator). Worth it for critical PRs.


Choosing the Right Pipeline

Pipeline Complexity Cost/PR Best For
PR Diff Analyzer Low $0.02-0.10 Every PR
Security Scanner Low ~$0 (static) Security-critical repos
Multi-Pass Medium $0.10-0.30 Important features
GitHub Actions Medium $0.02-0.10 Full automation
Review Board High $0.20-0.50 Critical systems

My recommendation: Start with Pipeline 1 + Pipeline 2. Add Pipeline 4 for CI integration. Use Pipeline 5 for high-stakes releases.


Common Pitfalls to Avoid

1. Reviewing everything with the most expensive pipeline

# ❌ Don't do this
def review_pr(diff):
    return expensive_multi_agent_review(diff)

# ✅ Do this instead
def review_pr(diff, labels: list[str]):
    if "critical" in labels or "security" in labels:
        return multi_agent_review(diff)
    elif len(diff) > 5000:
        return multi_pass_review(diff)
    else:
        return simple_diff_review(diff)
Enter fullscreen mode Exit fullscreen mode

2. Not filtering noise

# Filter out auto-generated files, lockfiles, etc.
IGNORE_PATTERNS = [
    "*.lock",
    "*.min.js",
    "*.generated.*",
    "package-lock.json",
    "yarn.lock",
    "__pycache__/",
    ".pyc",
    "migrations/",
]

def should_review(filepath: str) -> bool:
    from fnmatch import fnmatch
    return not any(
        fnmatch(filepath, pattern)
        for pattern in IGNORE_PATTERNS
    )
Enter fullscreen mode Exit fullscreen mode

3. Not tracking review quality

# Track whether AI reviews were helpful
@dataclass
class ReviewFeedback:
    pr_number: int
    ai_comments: int
    helpful_comments: int  # Developer marked as useful
    false_positives: int   # Developer dismissed
    missed_issues: int     # Human found what AI missed

    @property
    def precision(self) -> float:
        total = self.helpful_comments + self.false_positives
        return self.helpful_comments / max(total, 1)

    @property
    def signal_to_noise(self) -> float:
        return self.helpful_comments / max(self.ai_comments, 1)
Enter fullscreen mode Exit fullscreen mode

Quick Start: Ship Your First AI Review in 30 Minutes

  1. Copy Pipeline 1 into scripts/ai_review.py
  2. Set your API key: export OPENAI_API_KEY=your-key
  3. Test locally:
   git diff main...HEAD | python scripts/ai_review.py
Enter fullscreen mode Exit fullscreen mode
  1. Add to CI with Pipeline 4's GitHub Action
  2. Iterate based on developer feedback

The first version won't be perfect. That's fine. The goal is to start catching obvious issues automatically, then improve over time.


Key Takeaways

  1. Start with simple diff analysis — it catches more than you'd expect
  2. Static + AI = best combo — Use regex for known patterns, LLMs for context
  3. Specialize your reviewers — Multi-pass beats single-prompt every time
  4. Automate in CI — Reviews only work if they're automatic
  5. Track precision — If developers ignore your AI reviews, they're noise

The best code review is the one that actually happens. AI makes that possible for every single PR.


Want pre-built review pipeline templates with GitHub Actions configs and customizable review passes? Check out the AI Dev Toolkit — ship reliable code, faster.

Top comments (0)