Your Code Reviews Are a Bottleneck
Every engineering team has the same problem: PRs sit in review queues for hours (or days). Senior engineers spend 30% of their time reviewing code. And half the feedback is stuff a machine could catch.
AI code review isn't about replacing human reviewers. It's about letting humans focus on architecture and design while AI handles the mechanical stuff.
Here are 5 pipelines you can build this weekend, starting from dead-simple and scaling up.
Pipeline 1: The PR Diff Analyzer
The simplest useful pipeline. Feed a PR diff to an LLM and get structured feedback.
import json
import subprocess
from dataclasses import dataclass, field
from typing import Any
@dataclass
class ReviewComment:
"""A single review comment on a code change."""
file: str
line: int
severity: str # "critical", "warning", "suggestion", "nitpick"
category: str # "bug", "security", "performance", "style", "logic"
message: str
suggested_fix: str | None = None
@dataclass
class ReviewResult:
"""Complete review result for a PR."""
pr_number: int
total_files: int
total_comments: int
critical_count: int
comments: list[ReviewComment] = field(default_factory=list)
summary: str = ""
approve: bool = True
class PRDiffAnalyzer:
"""
Analyze git diffs and produce structured code review feedback.
Works with any LLM that supports chat completions.
"""
REVIEW_PROMPT = """You are an expert code reviewer. Analyze this git diff and provide structured feedback.
## Diff
{diff}
## Review Guidelines
- Focus on bugs, security issues, and logic errors
- Flag performance problems with concrete impact
- Note missing error handling
- Suggest improvements, not just problems
- Be specific: reference file names and line numbers
## Output Format
Respond with a JSON object:
{{
"comments": [
{{
"file": "path/to/file.py",
"line": 42,
"severity": "critical|warning|suggestion|nitpick",
"category": "bug|security|performance|style|logic",
"message": "Description of the issue",
"suggested_fix": "Code or description of fix (optional)"
}}
],
"summary": "One paragraph overall assessment",
"approve": true/false
}}"""
def __init__(self, llm_client: Any = None, model: str = "gpt-4o"):
self.llm_client = llm_client
self.model = model
def get_diff(
self, base_branch: str = "main", head_branch: str = "HEAD"
) -> str:
"""Get git diff between two branches."""
result = subprocess.run(
["git", "diff", f"{base_branch}...{head_branch}"],
capture_output=True,
text=True,
)
return result.stdout
def get_pr_diff(self, pr_number: int) -> str:
"""Get diff for a GitHub PR using the CLI."""
result = subprocess.run(
["gh", "pr", "diff", str(pr_number)],
capture_output=True,
text=True,
)
return result.stdout
def _chunk_diff(self, diff: str, max_chars: int = 12000) -> list[str]:
"""Split large diffs into reviewable chunks."""
files = diff.split("diff --git ")
chunks = []
current_chunk = ""
for file_diff in files:
if not file_diff.strip():
continue
if len(current_chunk) + len(file_diff) > max_chars:
if current_chunk:
chunks.append(current_chunk)
current_chunk = file_diff
else:
current_chunk += f"\ndiff --git {file_diff}"
if current_chunk:
chunks.append(current_chunk)
return chunks
def _call_llm(self, prompt: str) -> str:
"""Call LLM for review. Override or inject your client."""
import httpx
import os
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Content-Type": "application/json",
},
json={
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"response_format": {"type": "json_object"},
},
timeout=60,
)
return response.json()["choices"][0]["message"]["content"]
def review_diff(self, diff: str) -> ReviewResult:
"""Review a complete diff."""
chunks = self._chunk_diff(diff)
all_comments = []
summaries = []
for chunk in chunks:
prompt = self.REVIEW_PROMPT.format(diff=chunk)
raw = self._call_llm(prompt)
parsed = json.loads(raw)
for c in parsed.get("comments", []):
all_comments.append(ReviewComment(**c))
summaries.append(parsed.get("summary", ""))
critical_count = sum(
1 for c in all_comments if c.severity == "critical"
)
return ReviewResult(
pr_number=0,
total_files=len(chunks),
total_comments=len(all_comments),
critical_count=critical_count,
comments=all_comments,
summary=" ".join(summaries),
approve=critical_count == 0,
)
# --- Usage ---
analyzer = PRDiffAnalyzer(model="gpt-4o")
# Review current branch against main
# diff = analyzer.get_diff("main", "HEAD")
# result = analyzer.review_diff(diff)
# print(f"Comments: {result.total_comments}, Critical: {result.critical_count}")
# print(f"Approve: {'✅' if result.approve else '❌'}")
Cost: ~$0.02-0.10 per PR depending on diff size.
Pipeline 2: Security-Focused Scanner
A specialized pipeline that focuses exclusively on security vulnerabilities.
import re
import json
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
class VulnSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityFinding:
"""A security vulnerability finding."""
file: str
line: int
severity: VulnSeverity
cwe_id: str
title: str
description: str
remediation: str
confidence: float # 0.0 to 1.0
@property
def is_blocking(self) -> bool:
return self.severity in (VulnSeverity.CRITICAL, VulnSeverity.HIGH)
class SecurityScanner:
"""
Combines static pattern matching with LLM-powered
contextual security analysis.
"""
# Static patterns for common vulnerabilities
VULN_PATTERNS = {
"sql_injection": {
"pattern": r"""(?x)
(?:execute|cursor\.execute|raw|RawSQL)\s*\(
\s*f['""] |
(?:execute|cursor\.execute)\s*\(\s*
['""].*?%s.*?['""].*?% |
\.format\s*\(.*?\).*?(?:execute|query)
""",
"cwe": "CWE-89",
"severity": VulnSeverity.CRITICAL,
"title": "Potential SQL Injection",
},
"hardcoded_secret": {
"pattern": r"""(?x)
(?:password|secret|token|api_key|apikey|
private_key|auth_token)\s*=\s*
['""][^'""]{8,}['""]
""",
"cwe": "CWE-798",
"severity": VulnSeverity.HIGH,
"title": "Hardcoded Secret",
},
"command_injection": {
"pattern": r"""(?x)
(?:os\.system|os\.popen|subprocess\.call|
subprocess\.Popen|subprocess\.run)\s*\(
\s*(?:f['""]|.*?\.format|.*?\+\s*(?!['""]))
""",
"cwe": "CWE-78",
"severity": VulnSeverity.CRITICAL,
"title": "Potential Command Injection",
},
"path_traversal": {
"pattern": r"""(?x)
(?:open|Path)\s*\(\s*
(?:request\.|user_input|params|
.*?\+\s*(?:request|input))
""",
"cwe": "CWE-22",
"severity": VulnSeverity.HIGH,
"title": "Potential Path Traversal",
},
"insecure_deserialization": {
"pattern": r"""(?x)
(?:pickle\.loads?|yaml\.(?:load|unsafe_load)|
marshal\.loads?)\s*\(
""",
"cwe": "CWE-502",
"severity": VulnSeverity.HIGH,
"title": "Insecure Deserialization",
},
"weak_crypto": {
"pattern": r"""(?x)
(?:hashlib\.(?:md5|sha1)|
DES\.|Blowfish|RC4|
random\.(?:random|randint|choice))
""",
"cwe": "CWE-327",
"severity": VulnSeverity.MEDIUM,
"title": "Weak Cryptography",
},
"ssrf": {
"pattern": r"""(?x)
(?:requests\.(?:get|post|put|delete|patch)|
httpx\.(?:get|post|put)|
urllib\.request\.urlopen)\s*\(
\s*(?:f['""]|.*?\.format|.*?\+)
""",
"cwe": "CWE-918",
"severity": VulnSeverity.HIGH,
"title": "Potential SSRF",
},
}
def __init__(self):
self.findings: list[SecurityFinding] = []
def scan_file(self, file_path: str, content: str) -> list[SecurityFinding]:
"""Scan a single file for vulnerabilities."""
findings = []
lines = content.split("\n")
for vuln_name, config in self.VULN_PATTERNS.items():
for i, line in enumerate(lines, 1):
# Skip comments
stripped = line.strip()
if stripped.startswith("#") or stripped.startswith("//"):
continue
if re.search(config["pattern"], line, re.IGNORECASE):
finding = SecurityFinding(
file=file_path,
line=i,
severity=config["severity"],
cwe_id=config["cwe"],
title=config["title"],
description=f"Found {vuln_name} pattern in: {line.strip()[:100]}",
remediation=self._get_remediation(vuln_name),
confidence=0.7,
)
findings.append(finding)
self.findings.extend(findings)
return findings
def scan_directory(
self,
directory: str,
extensions: list[str] | None = None,
) -> list[SecurityFinding]:
"""Scan all files in a directory."""
extensions = extensions or [".py", ".js", ".ts", ".go", ".rb", ".java"]
all_findings = []
for path in Path(directory).rglob("*"):
if path.suffix in extensions and path.is_file():
content = path.read_text(errors="ignore")
findings = self.scan_file(str(path), content)
all_findings.extend(findings)
return all_findings
def scan_diff(self, diff: str) -> list[SecurityFinding]:
"""Scan only the added lines in a diff."""
findings = []
current_file = ""
current_line = 0
for line in diff.split("\n"):
if line.startswith("+++ b/"):
current_file = line[6:]
elif line.startswith("@@ "):
# Parse line number from hunk header
match = re.search(r"\+(\d+)", line)
if match:
current_line = int(match.group(1)) - 1
elif line.startswith("+") and not line.startswith("+++"):
current_line += 1
added_line = line[1:]
for vuln_name, config in self.VULN_PATTERNS.items():
if re.search(
config["pattern"], added_line, re.IGNORECASE
):
findings.append(
SecurityFinding(
file=current_file,
line=current_line,
severity=config["severity"],
cwe_id=config["cwe"],
title=config["title"],
description=(
f"New code introduces {vuln_name}: "
f"{added_line.strip()[:100]}"
),
remediation=self._get_remediation(vuln_name),
confidence=0.7,
)
)
elif not line.startswith("-"):
current_line += 1
self.findings.extend(findings)
return findings
def _get_remediation(self, vuln_name: str) -> str:
remediations = {
"sql_injection": "Use parameterized queries or an ORM. Never interpolate user input into SQL strings.",
"hardcoded_secret": "Move secrets to environment variables or a secrets manager (AWS Secrets Manager, Vault, etc).",
"command_injection": "Use subprocess with a list of arguments instead of shell=True. Validate and sanitize all inputs.",
"path_traversal": "Use pathlib.resolve() and check the resolved path starts with your allowed base directory.",
"insecure_deserialization": "Use json.loads() instead of pickle/yaml.load(). If pickle is necessary, use hmac verification.",
"weak_crypto": "Use hashlib.sha256 or bcrypt for passwords. Use secrets.token_hex() for random tokens.",
"ssrf": "Validate URLs against an allowlist. Block internal/private IP ranges. Use a URL parser to verify the hostname.",
}
return remediations.get(vuln_name, "Review and fix the identified vulnerability.")
def get_report(self) -> dict:
blocking = [f for f in self.findings if f.is_blocking]
return {
"total_findings": len(self.findings),
"blocking": len(blocking),
"by_severity": {
s.value: sum(1 for f in self.findings if f.severity == s)
for s in VulnSeverity
},
"should_block": len(blocking) > 0,
"findings": [
{
"file": f.file,
"line": f.line,
"severity": f.severity.value,
"cwe": f.cwe_id,
"title": f.title,
"description": f.description,
"remediation": f.remediation,
}
for f in sorted(
self.findings,
key=lambda x: list(VulnSeverity).index(x.severity),
)
],
}
# --- Usage ---
scanner = SecurityScanner()
# Scan a diff
# diff = subprocess.run(["git", "diff", "main...HEAD"], capture_output=True, text=True).stdout
# findings = scanner.scan_diff(diff)
# Or scan a directory
# findings = scanner.scan_directory("./src")
# Get report
# report = scanner.get_report()
# if report["should_block"]:
# print(f"🚨 {report['blocking']} blocking security issues found!")
# sys.exit(1)
Pipeline 3: The Multi-Pass Reviewer
Different aspects of code need different review strategies. This pipeline runs multiple specialized passes.
import json
from dataclasses import dataclass, field
from typing import Any, Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class ReviewPass:
"""A single specialized review pass."""
name: str
system_prompt: str
focus_areas: list[str]
severity_weight: float = 1.0
@dataclass
class MultiPassResult:
"""Combined results from all review passes."""
passes_completed: int
total_comments: int
weighted_score: float
comments_by_pass: dict[str, list[dict]]
summary: str
decision: str # "approve", "request_changes", "block"
class MultiPassReviewer:
"""
Run multiple specialized review passes over the same code.
Each pass focuses on a different quality dimension.
"""
DEFAULT_PASSES = [
ReviewPass(
name="correctness",
system_prompt="""You are a correctness-focused code reviewer.
Look ONLY for:
- Logic errors and bugs
- Off-by-one errors
- Null/undefined handling
- Race conditions
- Edge cases not handled
Do NOT comment on style, naming, or performance.""",
focus_areas=["bugs", "logic", "edge_cases"],
severity_weight=3.0,
),
ReviewPass(
name="security",
system_prompt="""You are a security-focused code reviewer.
Look ONLY for:
- Injection vulnerabilities (SQL, command, XSS)
- Authentication/authorization flaws
- Data exposure risks
- Insecure dependencies
- Missing input validation
Do NOT comment on style or performance.""",
focus_areas=["security", "auth", "injection"],
severity_weight=5.0,
),
ReviewPass(
name="performance",
system_prompt="""You are a performance-focused code reviewer.
Look ONLY for:
- O(n²) or worse algorithms where O(n) exists
- Unnecessary database queries (N+1)
- Memory leaks or excessive allocations
- Missing caching opportunities
- Blocking operations in async contexts
Do NOT comment on style or correctness.""",
focus_areas=["performance", "scalability", "efficiency"],
severity_weight=2.0,
),
ReviewPass(
name="maintainability",
system_prompt="""You are a maintainability-focused code reviewer.
Look ONLY for:
- Functions that are too long (>50 lines)
- Missing or misleading documentation
- Poor naming that obscures intent
- Duplicated logic that should be extracted
- Missing type hints on public APIs
Be brief. Only comment on significant issues.""",
focus_areas=["readability", "documentation", "structure"],
severity_weight=1.0,
),
]
PASS_PROMPT_TEMPLATE = """{system_prompt}
## Code to Review
{code}
## Output Format
Return JSON:
{{
"comments": [
{{
"file": "filename",
"line": 0,
"issue": "description",
"severity": "critical|warning|suggestion",
"fix": "suggested fix (optional)"
}}
],
"pass_score": <1-10>,
"summary": "one sentence"
}}"""
def __init__(
self,
passes: list[ReviewPass] | None = None,
llm_fn: Callable[[str], str] | None = None,
parallel: bool = True,
):
self.passes = passes or self.DEFAULT_PASSES
self.llm_fn = llm_fn or self._default_llm
self.parallel = parallel
def _default_llm(self, prompt: str) -> str:
"""Default LLM call — replace with your implementation."""
import httpx
import os
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Content-Type": "application/json",
},
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"response_format": {"type": "json_object"},
},
timeout=60,
)
return response.json()["choices"][0]["message"]["content"]
def _run_single_pass(
self, review_pass: ReviewPass, code: str
) -> tuple[str, dict]:
"""Run a single review pass."""
prompt = self.PASS_PROMPT_TEMPLATE.format(
system_prompt=review_pass.system_prompt,
code=code,
)
raw = self.llm_fn(prompt)
parsed = json.loads(raw)
return review_pass.name, parsed
def review(self, code: str) -> MultiPassResult:
"""Run all review passes and combine results."""
pass_results: dict[str, dict] = {}
if self.parallel:
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(
self._run_single_pass, rp, code
): rp.name
for rp in self.passes
}
for future in as_completed(futures):
name, result = future.result()
pass_results[name] = result
else:
for rp in self.passes:
name, result = self._run_single_pass(rp, code)
pass_results[name] = result
# Combine and score
all_comments = {}
total_comments = 0
weighted_scores = []
for rp in self.passes:
name = rp.name
if name in pass_results:
result = pass_results[name]
comments = result.get("comments", [])
all_comments[name] = comments
total_comments += len(comments)
score = result.get("pass_score", 5)
weighted_scores.append(score * rp.severity_weight)
# Calculate weighted average
total_weight = sum(rp.severity_weight for rp in self.passes)
weighted_avg = (
sum(weighted_scores) / total_weight if total_weight else 5.0
)
# Decision logic
has_critical = any(
c.get("severity") == "critical"
for comments in all_comments.values()
for c in comments
)
if has_critical:
decision = "block"
elif weighted_avg < 5.0:
decision = "request_changes"
else:
decision = "approve"
summaries = [
f"**{name}**: {pass_results[name].get('summary', 'N/A')}"
for name in pass_results
]
return MultiPassResult(
passes_completed=len(pass_results),
total_comments=total_comments,
weighted_score=round(weighted_avg, 1),
comments_by_pass=all_comments,
summary="\n".join(summaries),
decision=decision,
)
# --- Usage ---
# reviewer = MultiPassReviewer(parallel=True)
# result = reviewer.review(code_to_review)
# print(f"Decision: {result.decision}")
# print(f"Score: {result.weighted_score}/10")
# print(f"Total comments: {result.total_comments}")
Why multi-pass? A single prompt asking for "everything" produces shallow reviews. Specialized passes go deeper on each dimension.
Pipeline 4: The GitHub Actions Integration
Wire your review pipeline into CI so every PR gets reviewed automatically.
# .github/workflows/ai-review.yml
name: AI Code Review
on:
pull_request:
types: [opened, synchronize]
permissions:
contents: read
pull-requests: write
jobs:
ai-review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install httpx
- name: Run AI Review
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
run: python scripts/ai_review.py
And the script it calls:
#!/usr/bin/env python3
"""
AI Code Review script for GitHub Actions.
Posts review comments directly on the PR.
"""
import json
import os
import subprocess
import sys
from dataclasses import dataclass
@dataclass
class PRComment:
path: str
line: int
body: str
side: str = "RIGHT"
class GitHubReviewer:
"""Post AI review comments to GitHub PRs."""
def __init__(self):
self.token = os.environ["GH_TOKEN"]
self.pr_number = int(os.environ["PR_NUMBER"])
self.repo = os.environ.get("GITHUB_REPOSITORY", "")
def get_diff(self) -> str:
result = subprocess.run(
["git", "diff", "origin/main...HEAD"],
capture_output=True,
text=True,
)
return result.stdout
def get_changed_files(self) -> list[str]:
result = subprocess.run(
["git", "diff", "--name-only", "origin/main...HEAD"],
capture_output=True,
text=True,
)
return [f for f in result.stdout.strip().split("\n") if f]
def review_with_llm(self, diff: str) -> list[dict]:
"""Send diff to LLM and get review comments."""
import httpx
prompt = f"""Review this git diff. Return JSON array of comments.
Each comment: {{"path": "file.py", "line": 42, "body": "issue description", "severity": "critical|warning|suggestion"}}
Only include actionable, specific feedback. No generic praise.
diff
{diff[:15000]}
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Content-Type": "application/json",
},
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"response_format": {"type": "json_object"},
},
timeout=90,
)
result = response.json()["choices"][0]["message"]["content"]
parsed = json.loads(result)
return parsed.get("comments", parsed) if isinstance(parsed, dict) else parsed
def post_review(self, comments: list[dict]):
"""Post a review with inline comments to the PR."""
import httpx
# Build PR review comments
pr_comments = []
for c in comments:
severity_emoji = {
"critical": "🚨",
"warning": "⚠️",
"suggestion": "💡",
}.get(c.get("severity", "suggestion"), "💡")
pr_comments.append({
"path": c["path"],
"line": c.get("line", 1),
"body": f"{severity_emoji} **{c.get('severity', 'suggestion').upper()}**\n\n{c['body']}",
"side": "RIGHT",
})
if not pr_comments:
# Post approval
body = {
"event": "APPROVE",
"body": "✅ AI Review: No issues found. LGTM!",
}
else:
critical = sum(
1 for c in comments
if c.get("severity") == "critical"
)
body = {
"event": "REQUEST_CHANGES" if critical > 0 else "COMMENT",
"body": (
f"🤖 **AI Code Review**\n\n"
f"Found {len(comments)} issue(s) "
f"({critical} critical)\n\n"
f"---\n*Automated review by AI pipeline*"
),
"comments": pr_comments,
}
response = httpx.post(
f"https://api.github.com/repos/{self.repo}"
f"/pulls/{self.pr_number}/reviews",
headers={
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github.v3+json",
},
json=body,
timeout=30,
)
if response.status_code in (200, 201):
print(f"✅ Review posted: {len(pr_comments)} comments")
else:
print(f"❌ Failed to post review: {response.status_code}")
print(response.text)
sys.exit(1)
def main():
reviewer = GitHubReviewer()
print("📥 Getting diff...")
diff = reviewer.get_diff()
if not diff.strip():
print("No changes to review")
return
print(f"📝 Reviewing {len(diff)} chars of diff...")
comments = reviewer.review_with_llm(diff)
print(f"📤 Posting {len(comments)} comments...")
reviewer.post_review(comments)
if __name__ == "__main__":
main()
Setup time: 15 minutes. Copy the files, add your API key as a GitHub secret, done.
Pipeline 5: The Multi-Agent Review Board
The most sophisticated pipeline. Multiple specialized AI "reviewers" collaborate, debate, and produce a consensus review.
import json
from dataclasses import dataclass, field
from typing import Any, Callable
from enum import Enum
class ReviewerRole(Enum):
ARCHITECT = "architect"
SECURITY = "security"
PERFORMANCE = "performance"
DX = "developer_experience"
MODERATOR = "moderator"
@dataclass
class ReviewerAgent:
"""A specialized code reviewer agent."""
role: ReviewerRole
name: str
system_prompt: str
temperature: float = 0.2
model: str = "gpt-4o"
@dataclass
class ReviewDebateRound:
"""One round of the review debate."""
round_number: int
contributions: dict[str, str] # role -> comment
@dataclass
class ConsensusReview:
"""Final consensus from the review board."""
decision: str # approve, request_changes, block
confidence: float
key_findings: list[dict]
debate_rounds: list[ReviewDebateRound]
dissenting_opinions: list[str]
final_summary: str
class ReviewBoard:
"""
Multi-agent code review system.
Specialized agents review code, then debate
and reach consensus through a moderator.
"""
DEFAULT_AGENTS = [
ReviewerAgent(
role=ReviewerRole.ARCHITECT,
name="Alex (Architecture)",
system_prompt="""You are Alex, a senior software architect.
You focus on:
- System design implications of code changes
- API contracts and backward compatibility
- Dependency management and coupling
- Scalability patterns
Speak concisely. Back claims with specifics.""",
),
ReviewerAgent(
role=ReviewerRole.SECURITY,
name="Sam (Security)",
system_prompt="""You are Sam, a security engineer.
You focus on:
- OWASP Top 10 vulnerabilities
- Authentication and authorization flaws
- Data exposure and privacy
- Supply chain security
You are cautious. Flag anything suspicious.""",
),
ReviewerAgent(
role=ReviewerRole.PERFORMANCE,
name="Pat (Performance)",
system_prompt="""You are Pat, a performance engineer.
You focus on:
- Algorithmic complexity
- Database query patterns
- Memory usage and leaks
- Caching opportunities
- Concurrency issues
Only flag measurable performance impacts.""",
),
ReviewerAgent(
role=ReviewerRole.DX,
name="Dana (Developer Experience)",
system_prompt="""You are Dana, a developer experience advocate.
You focus on:
- Code readability and clarity
- Documentation quality
- Test coverage
- Error messages and debugging
- Onboarding-friendliness
You represent future developers reading this code.""",
),
]
MODERATOR = ReviewerAgent(
role=ReviewerRole.MODERATOR,
name="Morgan (Moderator)",
system_prompt="""You are Morgan, the review board moderator.
Your job:
1. Synthesize feedback from all reviewers
2. Resolve disagreements with reasoning
3. Produce a final, actionable review
4. Make the approve/block/request_changes decision
Be fair. Weight security concerns highest.
Note any dissenting opinions.""",
model="gpt-4o",
)
INDIVIDUAL_REVIEW_PROMPT = """## Your Review
Review this code change from your expert perspective.
{code}
Previous reviewers said:
{previous_reviews}
Provide your unique perspective. Don't repeat what others said.
Return JSON:
{{
"findings": [
{{
"severity": "critical|high|medium|low",
"title": "short title",
"description": "detailed explanation",
"file": "filename (if applicable)",
"line": 0,
"suggested_fix": "optional"
}}
],
"vote": "approve|request_changes|block",
"confidence": 0.0-1.0,
"summary": "one paragraph"
}}"""
CONSENSUS_PROMPT = """## Build Consensus
You are the moderator. Here are all reviews:
{all_reviews}
Code under review:
{code}
Synthesize into a final review. Resolve disagreements.
Return JSON:
{{
"decision": "approve|request_changes|block",
"confidence": 0.0-1.0,
"key_findings": [
{{
"severity": "critical|high|medium|low",
"title": "finding title",
"description": "synthesized description",
"agreed_by": ["reviewer names"],
"remediation": "how to fix"
}}
],
"dissenting_opinions": ["any unresolved disagreements"],
"summary": "final assessment paragraph"
}}"""
def __init__(
self,
agents: list[ReviewerAgent] | None = None,
llm_fn: Callable[[str, str, float], str] | None = None,
):
self.agents = agents or self.DEFAULT_AGENTS
self.moderator = self.MODERATOR
self.llm_fn = llm_fn or self._default_llm
def _default_llm(
self, prompt: str, model: str = "gpt-4o", temp: float = 0.2
) -> str:
import httpx
import os
response = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
"Content-Type": "application/json",
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temp,
"response_format": {"type": "json_object"},
},
timeout=90,
)
return response.json()["choices"][0]["message"]["content"]
def _individual_review(
self,
agent: ReviewerAgent,
code: str,
previous: str = "None yet — you're first.",
) -> dict:
"""Get a review from a single agent."""
prompt = (
f"{agent.system_prompt}\n\n"
+ self.INDIVIDUAL_REVIEW_PROMPT.format(
code=code, previous_reviews=previous
)
)
raw = self.llm_fn(prompt, agent.model, agent.temperature)
result = json.loads(raw)
result["reviewer"] = agent.name
return result
def review(self, code: str, rounds: int = 1) -> ConsensusReview:
"""
Run the full review board process.
1. Each agent reviews independently
2. Optional debate rounds
3. Moderator synthesizes consensus
"""
debate_log: list[ReviewDebateRound] = []
# Round 1: Independent reviews
print("🔍 Round 1: Independent reviews...")
reviews = {}
previous = "None yet — you're first."
for agent in self.agents:
print(f" 📝 {agent.name} reviewing...")
result = self._individual_review(agent, code, previous)
reviews[agent.name] = result
# Each subsequent reviewer sees previous reviews
previous = json.dumps(
{name: r.get("summary", "") for name, r in reviews.items()},
indent=2,
)
debate_log.append(
ReviewDebateRound(
round_number=1,
contributions={
name: r.get("summary", "")
for name, r in reviews.items()
},
)
)
# Additional debate rounds (optional)
for round_num in range(2, rounds + 1):
print(f"💬 Round {round_num}: Debate...")
round_contributions = {}
for agent in self.agents:
result = self._individual_review(
agent, code, json.dumps(reviews, indent=2)
)
reviews[agent.name] = result
round_contributions[agent.name] = result.get("summary", "")
debate_log.append(
ReviewDebateRound(
round_number=round_num,
contributions=round_contributions,
)
)
# Moderator consensus
print("⚖️ Moderator building consensus...")
all_reviews_str = json.dumps(reviews, indent=2)
consensus_prompt = (
f"{self.moderator.system_prompt}\n\n"
+ self.CONSENSUS_PROMPT.format(
all_reviews=all_reviews_str, code=code
)
)
raw = self.llm_fn(
consensus_prompt,
self.moderator.model,
self.moderator.temperature,
)
consensus = json.loads(raw)
return ConsensusReview(
decision=consensus["decision"],
confidence=consensus.get("confidence", 0.5),
key_findings=consensus.get("key_findings", []),
debate_rounds=debate_log,
dissenting_opinions=consensus.get("dissenting_opinions", []),
final_summary=consensus.get("summary", ""),
)
# --- Usage ---
def run_review_board():
board = ReviewBoard()
# Read the code to review
import subprocess
diff = subprocess.run(
["git", "diff", "main...HEAD"],
capture_output=True, text=True,
).stdout
if not diff:
print("No changes to review")
return
result = board.review(diff, rounds=1)
print("\n" + "=" * 60)
print(f"📊 REVIEW BOARD DECISION: {result.decision.upper()}")
print(f" Confidence: {result.confidence:.0%}")
print("=" * 60)
print(f"\n{result.final_summary}")
if result.key_findings:
print(f"\n📋 Key Findings ({len(result.key_findings)}):")
for f in result.key_findings:
emoji = {"critical": "🚨", "high": "🔴", "medium": "🟡", "low": "🟢"}.get(
f.get("severity", "low"), "⚪"
)
print(f" {emoji} [{f['severity']}] {f['title']}")
print(f" {f['description'][:100]}")
if result.dissenting_opinions:
print(f"\n⚡ Dissenting opinions:")
for opinion in result.dissenting_opinions:
print(f" - {opinion}")
if __name__ == "__main__":
run_review_board()
Cost: ~$0.20-0.50 per review (4 agents + moderator). Worth it for critical PRs.
Choosing the Right Pipeline
| Pipeline | Complexity | Cost/PR | Best For |
|---|---|---|---|
| PR Diff Analyzer | Low | $0.02-0.10 | Every PR |
| Security Scanner | Low | ~$0 (static) | Security-critical repos |
| Multi-Pass | Medium | $0.10-0.30 | Important features |
| GitHub Actions | Medium | $0.02-0.10 | Full automation |
| Review Board | High | $0.20-0.50 | Critical systems |
My recommendation: Start with Pipeline 1 + Pipeline 2. Add Pipeline 4 for CI integration. Use Pipeline 5 for high-stakes releases.
Common Pitfalls to Avoid
1. Reviewing everything with the most expensive pipeline
# ❌ Don't do this
def review_pr(diff):
return expensive_multi_agent_review(diff)
# ✅ Do this instead
def review_pr(diff, labels: list[str]):
if "critical" in labels or "security" in labels:
return multi_agent_review(diff)
elif len(diff) > 5000:
return multi_pass_review(diff)
else:
return simple_diff_review(diff)
2. Not filtering noise
# Filter out auto-generated files, lockfiles, etc.
IGNORE_PATTERNS = [
"*.lock",
"*.min.js",
"*.generated.*",
"package-lock.json",
"yarn.lock",
"__pycache__/",
".pyc",
"migrations/",
]
def should_review(filepath: str) -> bool:
from fnmatch import fnmatch
return not any(
fnmatch(filepath, pattern)
for pattern in IGNORE_PATTERNS
)
3. Not tracking review quality
# Track whether AI reviews were helpful
@dataclass
class ReviewFeedback:
pr_number: int
ai_comments: int
helpful_comments: int # Developer marked as useful
false_positives: int # Developer dismissed
missed_issues: int # Human found what AI missed
@property
def precision(self) -> float:
total = self.helpful_comments + self.false_positives
return self.helpful_comments / max(total, 1)
@property
def signal_to_noise(self) -> float:
return self.helpful_comments / max(self.ai_comments, 1)
Quick Start: Ship Your First AI Review in 30 Minutes
-
Copy Pipeline 1 into
scripts/ai_review.py -
Set your API key:
export OPENAI_API_KEY=your-key - Test locally:
git diff main...HEAD | python scripts/ai_review.py
- Add to CI with Pipeline 4's GitHub Action
- Iterate based on developer feedback
The first version won't be perfect. That's fine. The goal is to start catching obvious issues automatically, then improve over time.
Key Takeaways
- Start with simple diff analysis — it catches more than you'd expect
- Static + AI = best combo — Use regex for known patterns, LLMs for context
- Specialize your reviewers — Multi-pass beats single-prompt every time
- Automate in CI — Reviews only work if they're automatic
- Track precision — If developers ignore your AI reviews, they're noise
The best code review is the one that actually happens. AI makes that possible for every single PR.
Want pre-built review pipeline templates with GitHub Actions configs and customizable review passes? Check out the AI Dev Toolkit — ship reliable code, faster.
Top comments (0)