52 SQL Injection Code Audit Exercises: From Beginner to Expert π
Master SQL injection detection through hands-on code review β no exploit development required.
Whether you're preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python's most popular database libraries.
π― What You'll Learn
- Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
- Recognize when input validation IS sufficient vs. when parameterization is required
- Understand the difference between identifiers (column/table names) and values
- Write secure fixes using proper parameterization patterns
π Prerequisites
- Basic Python knowledge
- Basic SQL syntax understanding
- Familiarity with web application concepts
π Challenge Format
For each snippet, determine:
- Vulnerable? (Yes/No)
- If yes, write the fix
Scoring Guide:
- Correct identification: 1 point
- Correct fix (if vulnerable): 1 point
π Key Concepts Before You Start
Identifiers vs Values
| Type | Examples | Can Parameterize? | Protection |
|---|---|---|---|
| Identifier | Column names, table names, aliases, ASC/DESC
|
β No | Allowlist validation |
| Value | Data in WHERE, LIMIT, function arguments |
β Yes | Parameterization |
Placeholder Syntax by Library
| Library | Placeholder | Example |
|---|---|---|
| sqlite3 | ? |
cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) |
| psycopg2 | %s |
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| mysql.connector | %s |
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) |
| SQLAlchemy | :param |
conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) |
Round 1: Fundamentals (Snippets 1-5)
Difficulty: Beginner
Snippet 1: Basic Authentication
import sqlite3
def authenticate_user(username, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
else:
return {"status": "failed"}
Snippet 2: Product Search
import psycopg2
def search_products(search_term):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
query = "SELECT name, price FROM products WHERE name ILIKE %s"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return results
Snippet 3: Account Check
import mysql.connector
def check_account_exists(email):
conn = mysql.connector.connect(host="localhost", database="accounts")
cursor = conn.cursor()
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
exists = cursor.fetchone() is not None
conn.close()
return {"exists": exists}
Snippet 4: Order Status Lookup
from sqlalchemy import create_engine, text
def get_order_status(order_id):
engine = create_engine('postgresql://localhost/orders')
with engine.connect() as conn:
query = text("SELECT status FROM orders WHERE id = :order_id")
result = conn.execute(query, {"order_id": order_id})
row = result.fetchone()
if row:
return {"status": row[0]}
return {"status": "not_found"}
Snippet 5: Report Generator
import sqlite3
def generate_report(table_name, date_filter):
conn = sqlite3.connect('reports.db')
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'customers']
if table_name not in allowed_tables:
return {"error": "Invalid table"}
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"data": results}
Round 2: Mixed Patterns (Snippets 6-14)
Difficulty: Intermediate
Snippet 6: User Profile with ID Check
import sqlite3
def get_user_profile(user_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if not user_id.isdigit():
return {"error": "Invalid user ID"}
query = f"SELECT username, email, bio FROM users WHERE id = {user_id}"
cursor.execute(query)
profile = cursor.fetchone()
conn.close()
if profile:
return {"username": profile[0], "email": profile[1], "bio": profile[2]}
return {"error": "User not found"}
Snippet 7: Flask Search with Sort
from flask import Flask, request
import psycopg2
app = Flask(__name__)
@app.route('/search')
def search():
query_param = request.args.get('q', '')
sort_by = request.args.get('sort', 'name')
conn = psycopg2.connect("dbname=products")
cursor = conn.cursor()
allowed_sort = ['name', 'price', 'date_added']
if sort_by not in allowed_sort:
sort_by = 'name'
query = f"SELECT * FROM products WHERE name ILIKE %s ORDER BY {sort_by}"
cursor.execute(query, (f"%{query_param}%",))
results = cursor.fetchall()
conn.close()
return {"results": results}
Snippet 8: Bulk Delete
import mysql.connector
def delete_users(user_ids: list):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
placeholders = ','.join(['%s'] * len(user_ids))
query = f"DELETE FROM users WHERE id IN ({placeholders})"
cursor.execute(query, tuple(user_ids))
conn.commit()
deleted_count = cursor.rowcount
conn.close()
return {"deleted": deleted_count}
Snippet 9: Login with Remember Token
import sqlite3
import hashlib
def login(username, password, remember_token=None):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
else:
password_hash = hashlib.sha256(password.encode()).hexdigest()
query = "SELECT * FROM users WHERE username = ? AND password_hash = ?"
cursor.execute(query, (username, password_hash))
user = cursor.fetchone()
conn.close()
if user:
return {"status": "success", "user_id": user[0]}
return {"status": "failed"}
Snippet 10: Django Connection Query
from django.db import connection
def get_orders_by_status(status, customer_id):
with connection.cursor() as cursor:
cursor.execute(
"SELECT * FROM orders WHERE status = %s AND customer_id = %s",
[status, customer_id]
)
rows = cursor.fetchall()
return [{"id": row[0], "total": row[1], "status": row[2]} for row in rows]
Snippet 11: Audit Log Search
import psycopg2
from datetime import datetime
def search_audit_logs(action_type, start_date, end_date, limit=100):
conn = psycopg2.connect("dbname=audit")
cursor = conn.cursor()
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 12: User Registration
import sqlite3
import hashlib
def register_user(username, email, password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT 1 FROM users WHERE username = ?", (username,))
if cursor.fetchone():
conn.close()
return {"error": "Username taken"}
cursor.execute("SELECT 1 FROM users WHERE email = ?", (email,))
if cursor.fetchone():
conn.close()
return {"error": "Email already registered"}
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
(username, email, password_hash)
)
conn.commit()
user_id = cursor.lastrowid
conn.close()
return {"success": True, "user_id": user_id}
Snippet 13: Dynamic Report Builder
import psycopg2
def build_report(table, columns, filters):
"""
table: string - table name
columns: list - columns to select
filters: dict - {column: value} pairs for WHERE clause
"""
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_tables = ['sales', 'inventory', 'employees']
allowed_columns = ['id', 'name', 'amount', 'date', 'department']
if table not in allowed_tables:
return {"error": "Invalid table"}
safe_columns = [c for c in columns if c in allowed_columns]
if not safe_columns:
return {"error": "No valid columns"}
column_str = ', '.join(safe_columns)
where_clauses = []
values = []
for col, val in filters.items():
if col in allowed_columns:
where_clauses.append(f"{col} = %s")
values.append(val)
query = f"SELECT {column_str} FROM {table}"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"data": results}
Snippet 14: Password Reset
import sqlite3
import secrets
def request_password_reset(email):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
user = cursor.fetchone()
if not user:
conn.close()
return {"message": "If email exists, reset link sent"}
reset_token = secrets.token_urlsafe(32)
cursor.execute(
"UPDATE users SET reset_token = ?, reset_expires = datetime('now', '+1 hour') WHERE id = ?",
(reset_token, user[0])
)
conn.commit()
conn.close()
return {"message": "If email exists, reset link sent"}
def reset_password(token, new_password):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
user = cursor.fetchone()
if not user:
conn.close()
return {"error": "Invalid or expired token"}
cursor.execute(
"UPDATE users SET password_hash = ?, reset_token = NULL WHERE id = ?",
(new_password, user[0])
)
conn.commit()
conn.close()
return {"success": True}
Round 3: Identifier Patterns (Snippets 15-26)
Difficulty: Intermediate-Advanced
Snippet 15: Dynamic Column Filter
import psycopg2
def filter_products(column, value):
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_columns = ['category', 'brand', 'status']
if column not in allowed_columns:
return {"error": "Invalid filter column"}
query = f"SELECT * FROM products WHERE {column} = %s"
cursor.execute(query, (value,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 16: Sort Direction
import sqlite3
def get_users_sorted(sort_dir):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 17: Numeric ID Check
import mysql.connector
def get_order(order_id):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
try:
order_id = int(order_id)
except ValueError:
return {"error": "Invalid order ID"}
query = f"SELECT * FROM orders WHERE id = {order_id}"
cursor.execute(query)
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 18: Multiple Column Sort
import psycopg2
def search_inventory(search_term, sort_columns: list):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_sorts = ['name', 'quantity', 'price', 'updated_at']
safe_sorts = [col for col in sort_columns if col in allowed_sorts]
if not safe_sorts:
safe_sorts = ['name']
sort_str = ', '.join(safe_sorts)
query = f"SELECT * FROM inventory WHERE name ILIKE %s ORDER BY {sort_str}"
cursor.execute(query, (f"%{search_term}%",))
results = cursor.fetchall()
conn.close()
return {"items": results}
Snippet 19: Pagination
import sqlite3
def get_posts(page, per_page):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
posts = cursor.fetchall()
conn.close()
return {"posts": posts}
Snippet 20: Schema Browser
import psycopg2
def get_table_columns(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
columns = cursor.fetchall()
conn.close()
return {"columns": columns}
Snippet 21: Aggregate Query
import mysql.connector
def get_sales_summary(group_by_col, year):
conn = mysql.connector.connect(host="localhost", database="sales")
cursor = conn.cursor()
allowed_groups = ['product_id', 'category', 'region', 'salesperson']
if group_by_col not in allowed_groups:
group_by_col = 'category'
if not isinstance(year, int) or year < 2000 or year > 2100:
return {"error": "Invalid year"}
query = f"""
SELECT {group_by_col}, SUM(amount) as total
FROM sales
WHERE YEAR(sale_date) = {year}
GROUP BY {group_by_col}
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"summary": results}
Snippet 22: User Preferences Update
import sqlite3
def update_preference(user_id, pref_key, pref_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_prefs = ['theme', 'language', 'timezone', 'notifications']
if pref_key not in allowed_prefs:
return {"error": "Invalid preference"}
query = f"UPDATE user_preferences SET {pref_key} = ? WHERE user_id = ?"
cursor.execute(query, (pref_value, user_id))
conn.commit()
conn.close()
return {"success": True}
Snippet 23: Log Search with Date Range
import psycopg2
def search_logs(start_date, end_date, log_level):
conn = psycopg2.connect("dbname=logging")
cursor = conn.cursor()
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
logs = cursor.fetchall()
conn.close()
return {"logs": logs}
Snippet 24: Dynamic IN Clause
import sqlite3
def get_users_by_role(roles: list):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'guest']
safe_roles = [r for r in roles if r in allowed_roles]
if not safe_roles:
return {"error": "No valid roles provided"}
placeholders = ','.join(['?'] * len(safe_roles))
query = f"SELECT id, username, role FROM users WHERE role IN ({placeholders})"
cursor.execute(query, tuple(safe_roles))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 25: Conditional Column Selection
import mysql.connector
def export_users(include_email, include_phone):
conn = mysql.connector.connect(host="localhost", database="app")
cursor = conn.cursor()
columns = ['id', 'username']
if include_email:
columns.append('email')
if include_phone:
columns.append('phone')
column_str = ', '.join(columns)
query = f"SELECT {column_str} FROM users"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 26: Boolean Filter
import psycopg2
def get_active_items(category, is_active):
conn = psycopg2.connect("dbname=inventory")
cursor = conn.cursor()
if not isinstance(is_active, bool):
return {"error": "is_active must be boolean"}
query = f"SELECT * FROM items WHERE category = %s AND active = {is_active}"
cursor.execute(query, (category,))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 4: Complex Patterns (Snippets 27-38)
Difficulty: Advanced
Snippet 27: Search with Multiple Filters
import psycopg2
def advanced_search(filters: dict):
"""
filters: {"name": "widget", "min_price": 10, "max_price": 100}
"""
conn = psycopg2.connect("dbname=store")
cursor = conn.cursor()
allowed_filters = ['name', 'min_price', 'max_price', 'category', 'brand']
where_clauses = []
values = []
for key, val in filters.items():
if key not in allowed_filters:
continue
if key == 'name':
where_clauses.append("name ILIKE %s")
values.append(f"%{val}%")
elif key == 'min_price':
where_clauses.append("price >= %s")
values.append(val)
elif key == 'max_price':
where_clauses.append("price <= %s")
values.append(val)
elif key in ('category', 'brand'):
where_clauses.append(f"{key} = %s")
values.append(val)
query = "SELECT * FROM products"
if where_clauses:
query += " WHERE " + " AND ".join(where_clauses)
cursor.execute(query, tuple(values))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 28: JSON Field Query
import sqlite3
def search_metadata(field_name, field_value):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 29: Bulk Status Update
import mysql.connector
def update_order_status(order_ids: list, new_status):
conn = mysql.connector.connect(host="localhost", database="shop")
cursor = conn.cursor()
allowed_statuses = ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
if new_status not in allowed_statuses:
return {"error": "Invalid status"}
if not all(isinstance(oid, int) for oid in order_ids):
return {"error": "Invalid order IDs"}
placeholders = ','.join(['%s'] * len(order_ids))
query = f"UPDATE orders SET status = %s WHERE id IN ({placeholders})"
cursor.execute(query, (new_status, *order_ids))
conn.commit()
conn.close()
return {"updated": cursor.rowcount}
Snippet 30: Column Alias
import psycopg2
def get_report(value_column, label):
conn = psycopg2.connect("dbname=reports")
cursor = conn.cursor()
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"report": results}
Snippet 31: SQLAlchemy Text Query
from sqlalchemy import create_engine, text
def find_user_by_field(field, value):
engine = create_engine('postgresql://localhost/app')
allowed_fields = ['username', 'email', 'phone']
if field not in allowed_fields:
return {"error": "Invalid field"}
with engine.connect() as conn:
query = text(f"SELECT * FROM users WHERE {field} = :value")
result = conn.execute(query, {"value": value})
user = result.fetchone()
return {"user": user}
Snippet 32: Date Range with Regex
import sqlite3
import re
def get_events(start_date, end_date):
conn = sqlite3.connect('events.db')
cursor = conn.cursor()
date_pattern = r'^\d{4}-\d{2}-\d{2}$'
if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
return {"error": "Invalid date format. Use YYYY-MM-DD"}
query = f"SELECT * FROM events WHERE event_date BETWEEN '{start_date}' AND '{end_date}'"
cursor.execute(query)
events = cursor.fetchall()
conn.close()
return {"events": events}
Snippet 33: Subquery Filter
import psycopg2
def get_top_customers(min_orders):
conn = psycopg2.connect("dbname=shop")
cursor = conn.cursor()
try:
min_orders = int(min_orders)
except (ValueError, TypeError):
return {"error": "Invalid minimum orders value"}
query = """
SELECT c.id, c.name, c.email
FROM customers c
WHERE (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.id) >= %s
"""
cursor.execute(query, (min_orders,))
customers = cursor.fetchall()
conn.close()
return {"customers": customers}
Snippet 34: LIKE with Match Types
import mysql.connector
def search_products(search_term, match_type):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if match_type == 'starts_with':
pattern = f"{search_term}%"
elif match_type == 'ends_with':
pattern = f"%{search_term}"
elif match_type == 'contains':
pattern = f"%{search_term}%"
else:
pattern = search_term
query = "SELECT * FROM products WHERE name LIKE %s"
cursor.execute(query, (pattern,))
results = cursor.fetchall()
conn.close()
return {"products": results}
Snippet 35: Django ORM
from django.contrib.auth.models import User
def search_users(search_term, order_by):
allowed_ordering = ['username', 'email', 'date_joined', '-username', '-email', '-date_joined']
if order_by not in allowed_ordering:
order_by = 'username'
users = User.objects.filter(
username__icontains=search_term
).order_by(order_by)
return list(users.values('id', 'username', 'email'))
Snippet 36: Table Join
import sqlite3
def get_order_details(order_id, include_customer):
conn = sqlite3.connect('shop.db')
cursor = conn.cursor()
if include_customer:
query = """
SELECT o.*, c.name, c.email
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.id = ?
"""
else:
query = "SELECT * FROM orders WHERE id = ?"
cursor.execute(query, (order_id,))
order = cursor.fetchone()
conn.close()
return {"order": order}
Snippet 37: Regex Column Validation
import psycopg2
import re
def get_column_stats(column_name):
conn = psycopg2.connect("dbname=analytics")
cursor = conn.cursor()
if not re.match(r'^[a-z_]+$', column_name):
return {"error": "Invalid column name"}
query = f"SELECT MIN({column_name}), MAX({column_name}), AVG({column_name}) FROM metrics"
cursor.execute(query)
stats = cursor.fetchone()
conn.close()
return {"min": stats[0], "max": stats[1], "avg": stats[2]}
Snippet 38: Cursor Pagination
import psycopg2
def get_paginated_items(cursor_id, limit, direction):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not isinstance(cursor_id, int):
cursor_id = 0
if not isinstance(limit, int) or limit < 1 or limit > 100:
limit = 20
if direction == 'next':
operator = '>'
order = 'ASC'
elif direction == 'prev':
operator = '<'
order = 'DESC'
else:
return {"error": "Invalid direction"}
query = f"SELECT * FROM items WHERE id {operator} %s ORDER BY id {order} LIMIT %s"
cursor.execute(query, (cursor_id, limit))
items = cursor.fetchall()
conn.close()
return {"items": items}
Round 5: Expert Patterns (Snippets 39-52)
Difficulty: Expert
Snippet 39: UUID Validation
import sqlite3
import re
def get_session(session_id):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
uuid_pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'
if not re.match(uuid_pattern, session_id):
return {"error": "Invalid session ID"}
query = f"SELECT * FROM sessions WHERE id = '{session_id}'"
cursor.execute(query)
session = cursor.fetchone()
conn.close()
return {"session": session}
Snippet 40: PostgreSQL JSON Operator
import psycopg2
def search_by_tag(tag_name):
conn = psycopg2.connect("dbname=content")
cursor = conn.cursor()
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"articles": results}
Snippet 41: Numeric String Check
import mysql.connector
def get_product(product_id):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
if not str(product_id).isnumeric():
return {"error": "Invalid product ID"}
query = f"SELECT * FROM products WHERE id = {product_id}"
cursor.execute(query)
product = cursor.fetchone()
conn.close()
return {"product": product}
Snippet 42: Window Function
import psycopg2
def get_ranked_sales(partition_col):
conn = psycopg2.connect("dbname=sales")
cursor = conn.cursor()
allowed_partitions = ['region', 'category', 'salesperson', 'quarter']
if partition_col not in allowed_partitions:
return {"error": "Invalid partition column"}
query = f"""
SELECT *, RANK() OVER (PARTITION BY {partition_col} ORDER BY amount DESC) as rank
FROM sales
"""
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"sales": results}
Snippet 43: COALESCE Function
import sqlite3
def get_user_display_name(user_id, default_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
result = cursor.fetchone()
conn.close()
return {"display_name": result[0] if result else None}
Snippet 44: Offset Pagination with Validation
import psycopg2
def get_comments(post_id, page):
conn = psycopg2.connect("dbname=blog")
cursor = conn.cursor()
try:
page = int(page)
if page < 1:
page = 1
except (ValueError, TypeError):
page = 1
offset = (page - 1) * 20
query = f"SELECT * FROM comments WHERE post_id = %s ORDER BY created_at LIMIT 20 OFFSET {offset}"
cursor.execute(query, (post_id,))
comments = cursor.fetchall()
conn.close()
return {"comments": comments}
Snippet 45: CAST Function
import mysql.connector
def search_by_year(year_input):
conn = mysql.connector.connect(host="localhost", database="archive")
cursor = conn.cursor()
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return {"documents": results}
Snippet 46: PostgreSQL ANY Operator
import psycopg2
def find_users_with_role(role):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
allowed_roles = ['admin', 'editor', 'viewer', 'moderator']
if role not in allowed_roles:
return {"error": "Invalid role"}
query = f"SELECT * FROM users WHERE '{role}' = ANY(roles)"
cursor.execute(query)
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 47: INTERVAL Expression
import psycopg2
def get_recent_activity(hours_ago):
conn = psycopg2.connect("dbname=activity")
cursor = conn.cursor()
if not isinstance(hours_ago, int) or hours_ago < 1 or hours_ago > 168:
hours_ago = 24
query = f"SELECT * FROM activity_log WHERE timestamp > NOW() - INTERVAL '{hours_ago} hours'"
cursor.execute(query)
activities = cursor.fetchall()
conn.close()
return {"activities": activities}
Snippet 48: Simple Parameterization
import sqlite3
def search_full_name(first_name, last_name):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = "SELECT * FROM users WHERE first_name = ? AND last_name = ?"
cursor.execute(query, (first_name, last_name))
users = cursor.fetchall()
conn.close()
return {"users": users}
Snippet 49: Table Function with isidentifier
import psycopg2
def get_table_size(table_name):
conn = psycopg2.connect("dbname=app")
cursor = conn.cursor()
if not table_name.isidentifier():
return {"error": "Invalid table name"}
query = f"SELECT pg_size_pretty(pg_total_relation_size('{table_name}'))"
cursor.execute(query)
size = cursor.fetchone()
conn.close()
return {"size": size[0]}
Snippet 50: Float Validation
import mysql.connector
def get_products_under_price(max_price):
conn = mysql.connector.connect(host="localhost", database="store")
cursor = conn.cursor()
try:
max_price = float(max_price)
except (ValueError, TypeError):
return {"error": "Invalid price"}
query = f"SELECT * FROM products WHERE price <= {max_price}"
cursor.execute(query)
products = cursor.fetchall()
conn.close()
return {"products": products}
Snippet 51: EXISTS Subquery
import sqlite3
def get_authors_with_posts(min_posts):
conn = sqlite3.connect('blog.db')
cursor = conn.cursor()
try:
min_posts = int(min_posts)
except ValueError:
return {"error": "Invalid minimum posts"}
query = """
SELECT * FROM authors a
WHERE EXISTS (
SELECT 1 FROM posts p
WHERE p.author_id = a.id
GROUP BY p.author_id
HAVING COUNT(*) >= ?
)
"""
cursor.execute(query, (min_posts,))
authors = cursor.fetchall()
conn.close()
return {"authors": authors}
Snippet 52: Schema-Qualified Table
import psycopg2
def get_data(schema_name, limit):
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
allowed_schemas = ['public', 'staging', 'archive']
if schema_name not in allowed_schemas:
return {"error": "Invalid schema"}
if not isinstance(limit, int) or limit < 1:
limit = 100
query = f"SELECT * FROM {schema_name}.reports ORDER BY created_at DESC LIMIT {limit}"
cursor.execute(query)
data = cursor.fetchall()
conn.close()
return {"data": data}
π Answer Key
Click to reveal answers
Round 1 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 1 | β Yes | f-string interpolation of username and password
|
| 2 | β No | Properly parameterized with %s
|
| 3 | β Yes | String concatenation of email
|
| 4 | β No | SQLAlchemy :param parameterization |
| 5 | β Yes |
date_filter interpolated via f-string (table is allowlisted) |
Snippet 1 Fix
# Vulnerable
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
Snippet 3 Fix
# Vulnerable
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)
# Fixed
query = "SELECT 1 FROM users WHERE email = %s LIMIT 1"
cursor.execute(query, (email,))
Snippet 5 Fix
# Vulnerable
query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
cursor.execute(query)
# Fixed (table_name already allowlisted, just parameterize date_filter)
query = f"SELECT * FROM {table_name} WHERE created_at > ?"
cursor.execute(query, (date_filter,))
Round 2 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 6 | β No |
.isdigit() prevents non-numeric input |
| 7 | β No |
sort_by allowlisted, query_param parameterized |
| 8 | β No | Placeholders generated safely, values parameterized |
| 9 | β Yes |
remember_token branch uses f-string |
| 10 | β No | Django parameterization with %s
|
| 11 | β Yes |
limit concatenated via str()
|
| 12 | β No | All queries use ? parameterization |
| 13 | β No | All identifiers allowlisted, values parameterized |
| 14 | β Yes |
reset_password function uses f-string for token
|
Snippet 9 Fix
# Vulnerable
if remember_token:
query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
cursor.execute(query)
# Fixed
if remember_token:
query = "SELECT * FROM users WHERE remember_token = ?"
cursor.execute(query, (remember_token,))
Snippet 11 Fix
# Vulnerable
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))
# Fixed
query = """
SELECT timestamp, user_id, action, details
FROM audit_logs
WHERE action = %s
AND timestamp BETWEEN %s AND %s
ORDER BY timestamp DESC
LIMIT %s
"""
cursor.execute(query, (action_type, start_date, end_date, limit))
Snippet 14 Fix (reset_password function)
# Vulnerable
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
cursor.execute(query)
# Fixed
query = "SELECT id FROM users WHERE reset_token = ? AND reset_expires > datetime('now')"
cursor.execute(query, (token,))
Round 3 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 15 | β No |
column allowlisted, value parameterized |
| 16 | β Yes |
sort_dir not validated |
| 17 | β No |
int() conversion prevents injection |
| 18 | β No | Columns filtered against allowlist |
| 19 | β Yes |
page and per_page not validated |
| 20 | β Yes |
table_name interpolated without validation |
| 21 | β No |
group_by_col allowlisted, year validated with isinstance(int)
|
| 22 | β No |
pref_key allowlisted, values parameterized |
| 23 | β Yes |
log_level concatenated |
| 24 | β No | Roles filtered against allowlist, then parameterized |
| 25 | β No | Boolean flags control hardcoded columns |
| 26 | β No |
isinstance(bool) only allows True/False |
Snippet 16 Fix
# Vulnerable
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)
# Fixed (allowlist validation β cannot parameterize ASC/DESC)
allowed_directions = ['ASC', 'DESC']
if sort_dir.upper() not in allowed_directions:
sort_dir = 'ASC'
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir.upper()}"
cursor.execute(query)
Snippet 19 Fix
# Vulnerable
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)
# Fixed (add validation + parameterization)
try:
page = int(page)
per_page = int(per_page)
if page < 1:
page = 1
if per_page < 1 or per_page > 100:
per_page = 20
except (ValueError, TypeError):
page, per_page = 1, 20
offset = (page - 1) * per_page
query = "SELECT * FROM posts ORDER BY created_at DESC LIMIT ? OFFSET ?"
cursor.execute(query, (per_page, offset))
Snippet 20 Fix
# Vulnerable
query = f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = '{table_name}'
"""
cursor.execute(query)
# Fixed
query = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = %s
"""
cursor.execute(query, (table_name,))
Snippet 23 Fix
# Vulnerable
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))
# Fixed
query = """
SELECT timestamp, level, message
FROM logs
WHERE timestamp >= %s
AND timestamp <= %s
AND level = %s
ORDER BY timestamp DESC
"""
cursor.execute(query, (start_date, end_date, log_level))
Round 4 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 27 | β No | Keys allowlisted, values parameterized |
| 28 | β Yes |
field_name in JSON path not validated |
| 29 | β No | Status allowlisted, IDs type-checked |
| 30 | β Yes |
label (alias) not validated |
| 31 | β No |
field allowlisted, value parameterized |
| 32 | β No | Strict regex allows only YYYY-MM-DD
|
| 33 | β No |
int() + parameterization |
| 34 | β No | Pattern built safely, parameterized |
| 35 | β No | Django ORM + allowlisted ordering |
| 36 | β No | Boolean selects hardcoded queries |
| 37 | β No | Strict regex ^[a-z_]+$
|
| 38 | β No | Type checks + hardcoded operators |
Snippet 28 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
# Fixed (allowlist validation β cannot parameterize JSON paths)
allowed_fields = ['author', 'category', 'status', 'priority']
if field_name not in allowed_fields:
return {"error": "Invalid field name"}
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
Snippet 30 Fix
# Vulnerable
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
# Fixed (allowlist validation β cannot parameterize aliases)
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
allowed_labels = ['total', 'amount', 'value', 'metric', 'result']
if value_column not in allowed_columns:
return {"error": "Invalid column"}
if label not in allowed_labels:
return {"error": "Invalid label"}
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
Round 5 Answers
| Snippet | Vulnerable? | Reason |
|---|---|---|
| 39 | β No | UUID regex only allows hex + hyphens |
| 40 | β Yes |
tag_name in ILIKE not parameterized |
| 41 | β No |
.isnumeric() prevents non-numeric |
| 42 | β No |
partition_col allowlisted |
| 43 | β Yes |
default_name in COALESCE not parameterized |
| 44 | β No |
int() with error handling |
| 45 | β Yes |
year_input in CAST not parameterized |
| 46 | β No |
role allowlisted |
| 47 | β No |
isinstance(int) + range check |
| 48 | β No | Proper parameterization |
| 49 | β No |
.isidentifier() allows only valid identifiers |
| 50 | β No |
float() with error handling |
| 51 | β No |
int() + parameterization |
| 52 | β No | Schema allowlisted, limit type-checked |
Snippet 40 Fix
# Vulnerable
query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)
# Fixed (parameterize with wildcards in parameter value)
query = "SELECT * FROM articles WHERE metadata->>'tags' ILIKE %s"
cursor.execute(query, (f"%{tag_name}%",))
Snippet 43 Fix
# Vulnerable
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))
# Fixed (parameterize both values)
query = "SELECT COALESCE(display_name, ?) FROM users WHERE id = ?"
cursor.execute(query, (default_name, user_id))
Snippet 45 Fix
# Vulnerable
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)
# Fixed
query = "SELECT * FROM documents WHERE CAST(created_year AS CHAR) = %s"
cursor.execute(query, (year_input,))
π Score Yourself
| Score | Level |
|---|---|
| 0-20 | Beginner β Review parameterization basics |
| 21-35 | Intermediate β Practice identifier vs value distinction |
| 36-45 | Advanced β Focus on subtle patterns |
| 46-52 | Expert β Ready for production code review! |
π Want More Exercises Like These?
I'm building an open-source repository of LeetCode-style secure coding exercises to help train the next generation of Security Engineers β and to curate high-quality secure code datasets for AI training.
β Star the repo: github.com/fosres/SecEng-Exercises
What's inside:
- π SQL Injection exercises (like these!)
- π‘οΈ XSS prevention patterns
- π Authentication security
- π‘ API security challenges
- π§ͺ 60+ test cases per exercise
Contributing
Found a bug? Want to add exercises? PRs welcome!
π References
These exercises were inspired by real-world patterns documented in:
- Full Stack Python Security by Dennis Byrne (Manning, 2021) β pp. 205-207
- Secure by Design by Johnsson, Deogun, Sawano (Manning, 2019) β Ch. 5
- API Security in Action by Neil Madden (Manning, 2020) β Ch. 2
- Hacking APIs by Corey Ball (No Starch Press, 2022) β Ch. 9
- OWASP SQL Injection Prevention Cheat Sheet β owasp.org
- PortSwigger Web Security Academy β portswigger.net
π·οΈ Tags
#security #python #sql #webdev #beginners #tutorial #opensource #appsec
Did these exercises help you? Drop a comment with your score! And don't forget to β the GitHub repo for more security content.

Top comments (0)