DEV Community

fosres
fosres

Posted on

Week 4 SQL Injection Audit Challenge

52 SQL Injection Code Audit Exercises: From Beginner to Expert πŸ”

SQL Injection Security

Master SQL injection detection through hands-on code review β€” no exploit development required.

Whether you're preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python's most popular database libraries.


🎯 What You'll Learn

  • Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
  • Recognize when input validation IS sufficient vs. when parameterization is required
  • Understand the difference between identifiers (column/table names) and values
  • Write secure fixes using proper parameterization patterns

πŸ“‹ Prerequisites

  • Basic Python knowledge
  • Basic SQL syntax understanding
  • Familiarity with web application concepts

πŸ† Challenge Format

For each snippet, determine:

  1. Vulnerable? (Yes/No)
  2. If yes, write the fix

Scoring Guide:

  • Correct identification: 1 point
  • Correct fix (if vulnerable): 1 point

πŸ”‘ Key Concepts Before You Start

Identifiers vs Values

Type Examples Can Parameterize? Protection
Identifier Column names, table names, aliases, ASC/DESC ❌ No Allowlist validation
Value Data in WHERE, LIMIT, function arguments βœ… Yes Parameterization

Placeholder Syntax by Library

Library Placeholder Example
sqlite3 ? cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
psycopg2 %s cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
mysql.connector %s cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
SQLAlchemy :param conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id})

Round 1: Fundamentals (Snippets 1-5)

Difficulty: Beginner

Snippet 1: Basic Authentication

import sqlite3

def authenticate_user(username, password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
    cursor.execute(query)

    user = cursor.fetchone()
    conn.close()

    if user:
        return {"status": "success", "user_id": user[0]}
    else:
        return {"status": "failed"}
Enter fullscreen mode Exit fullscreen mode

Snippet 2: Product Search

import psycopg2

def search_products(search_term):
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    query = "SELECT name, price FROM products WHERE name ILIKE %s"
    cursor.execute(query, (f"%{search_term}%",))

    results = cursor.fetchall()
    conn.close()
    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 3: Account Check

import mysql.connector

def check_account_exists(email):
    conn = mysql.connector.connect(host="localhost", database="accounts")
    cursor = conn.cursor()

    query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
    cursor.execute(query)

    exists = cursor.fetchone() is not None
    conn.close()

    return {"exists": exists}
Enter fullscreen mode Exit fullscreen mode

Snippet 4: Order Status Lookup

from sqlalchemy import create_engine, text

def get_order_status(order_id):
    engine = create_engine('postgresql://localhost/orders')

    with engine.connect() as conn:
        query = text("SELECT status FROM orders WHERE id = :order_id")
        result = conn.execute(query, {"order_id": order_id})

        row = result.fetchone()
        if row:
            return {"status": row[0]}
        return {"status": "not_found"}
Enter fullscreen mode Exit fullscreen mode

Snippet 5: Report Generator

import sqlite3

def generate_report(table_name, date_filter):
    conn = sqlite3.connect('reports.db')
    cursor = conn.cursor()

    allowed_tables = ['sales', 'inventory', 'customers']

    if table_name not in allowed_tables:
        return {"error": "Invalid table"}

    query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"data": results}
Enter fullscreen mode Exit fullscreen mode

Round 2: Mixed Patterns (Snippets 6-14)

Difficulty: Intermediate

Snippet 6: User Profile with ID Check

import sqlite3

def get_user_profile(user_id):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    if not user_id.isdigit():
        return {"error": "Invalid user ID"}

    query = f"SELECT username, email, bio FROM users WHERE id = {user_id}"
    cursor.execute(query)

    profile = cursor.fetchone()
    conn.close()

    if profile:
        return {"username": profile[0], "email": profile[1], "bio": profile[2]}
    return {"error": "User not found"}
Enter fullscreen mode Exit fullscreen mode

Snippet 7: Flask Search with Sort

from flask import Flask, request
import psycopg2

app = Flask(__name__)

@app.route('/search')
def search():
    query_param = request.args.get('q', '')
    sort_by = request.args.get('sort', 'name')

    conn = psycopg2.connect("dbname=products")
    cursor = conn.cursor()

    allowed_sort = ['name', 'price', 'date_added']
    if sort_by not in allowed_sort:
        sort_by = 'name'

    query = f"SELECT * FROM products WHERE name ILIKE %s ORDER BY {sort_by}"
    cursor.execute(query, (f"%{query_param}%",))

    results = cursor.fetchall()
    conn.close()
    return {"results": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 8: Bulk Delete

import mysql.connector

def delete_users(user_ids: list):
    conn = mysql.connector.connect(host="localhost", database="app")
    cursor = conn.cursor()

    placeholders = ','.join(['%s'] * len(user_ids))
    query = f"DELETE FROM users WHERE id IN ({placeholders})"
    cursor.execute(query, tuple(user_ids))

    conn.commit()
    deleted_count = cursor.rowcount
    conn.close()

    return {"deleted": deleted_count}
Enter fullscreen mode Exit fullscreen mode

Snippet 9: Login with Remember Token

import sqlite3
import hashlib

def login(username, password, remember_token=None):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    if remember_token:
        query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
        cursor.execute(query)
    else:
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        query = "SELECT * FROM users WHERE username = ? AND password_hash = ?"
        cursor.execute(query, (username, password_hash))

    user = cursor.fetchone()
    conn.close()

    if user:
        return {"status": "success", "user_id": user[0]}
    return {"status": "failed"}
Enter fullscreen mode Exit fullscreen mode

Snippet 10: Django Connection Query

from django.db import connection

def get_orders_by_status(status, customer_id):
    with connection.cursor() as cursor:
        cursor.execute(
            "SELECT * FROM orders WHERE status = %s AND customer_id = %s",
            [status, customer_id]
        )
        rows = cursor.fetchall()

    return [{"id": row[0], "total": row[1], "status": row[2]} for row in rows]
Enter fullscreen mode Exit fullscreen mode

Snippet 11: Audit Log Search

import psycopg2
from datetime import datetime

def search_audit_logs(action_type, start_date, end_date, limit=100):
    conn = psycopg2.connect("dbname=audit")
    cursor = conn.cursor()

    query = """
        SELECT timestamp, user_id, action, details 
        FROM audit_logs 
        WHERE action = %s 
        AND timestamp BETWEEN %s AND %s
        ORDER BY timestamp DESC
        LIMIT """ + str(limit)

    cursor.execute(query, (action_type, start_date, end_date))

    logs = cursor.fetchall()
    conn.close()
    return {"logs": logs}
Enter fullscreen mode Exit fullscreen mode

Snippet 12: User Registration

import sqlite3
import hashlib

def register_user(username, email, password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    cursor.execute("SELECT 1 FROM users WHERE username = ?", (username,))
    if cursor.fetchone():
        conn.close()
        return {"error": "Username taken"}

    cursor.execute("SELECT 1 FROM users WHERE email = ?", (email,))
    if cursor.fetchone():
        conn.close()
        return {"error": "Email already registered"}

    password_hash = hashlib.sha256(password.encode()).hexdigest()
    cursor.execute(
        "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
        (username, email, password_hash)
    )

    conn.commit()
    user_id = cursor.lastrowid
    conn.close()

    return {"success": True, "user_id": user_id}
Enter fullscreen mode Exit fullscreen mode

Snippet 13: Dynamic Report Builder

import psycopg2

def build_report(table, columns, filters):
    """
    table: string - table name
    columns: list - columns to select
    filters: dict - {column: value} pairs for WHERE clause
    """
    conn = psycopg2.connect("dbname=reports")
    cursor = conn.cursor()

    allowed_tables = ['sales', 'inventory', 'employees']
    allowed_columns = ['id', 'name', 'amount', 'date', 'department']

    if table not in allowed_tables:
        return {"error": "Invalid table"}

    safe_columns = [c for c in columns if c in allowed_columns]
    if not safe_columns:
        return {"error": "No valid columns"}

    column_str = ', '.join(safe_columns)

    where_clauses = []
    values = []
    for col, val in filters.items():
        if col in allowed_columns:
            where_clauses.append(f"{col} = %s")
            values.append(val)

    query = f"SELECT {column_str} FROM {table}"
    if where_clauses:
        query += " WHERE " + " AND ".join(where_clauses)

    cursor.execute(query, tuple(values))
    results = cursor.fetchall()
    conn.close()

    return {"data": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 14: Password Reset

import sqlite3
import secrets

def request_password_reset(email):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
    user = cursor.fetchone()

    if not user:
        conn.close()
        return {"message": "If email exists, reset link sent"}

    reset_token = secrets.token_urlsafe(32)

    cursor.execute(
        "UPDATE users SET reset_token = ?, reset_expires = datetime('now', '+1 hour') WHERE id = ?",
        (reset_token, user[0])
    )
    conn.commit()
    conn.close()

    return {"message": "If email exists, reset link sent"}


def reset_password(token, new_password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
    cursor.execute(query)
    user = cursor.fetchone()

    if not user:
        conn.close()
        return {"error": "Invalid or expired token"}

    cursor.execute(
        "UPDATE users SET password_hash = ?, reset_token = NULL WHERE id = ?",
        (new_password, user[0])
    )
    conn.commit()
    conn.close()

    return {"success": True}
Enter fullscreen mode Exit fullscreen mode

Round 3: Identifier Patterns (Snippets 15-26)

Difficulty: Intermediate-Advanced

Snippet 15: Dynamic Column Filter

import psycopg2

def filter_products(column, value):
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    allowed_columns = ['category', 'brand', 'status']

    if column not in allowed_columns:
        return {"error": "Invalid filter column"}

    query = f"SELECT * FROM products WHERE {column} = %s"
    cursor.execute(query, (value,))

    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 16: Sort Direction

import sqlite3

def get_users_sorted(sort_dir):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 17: Numeric ID Check

import mysql.connector

def get_order(order_id):
    conn = mysql.connector.connect(host="localhost", database="shop")
    cursor = conn.cursor()

    try:
        order_id = int(order_id)
    except ValueError:
        return {"error": "Invalid order ID"}

    query = f"SELECT * FROM orders WHERE id = {order_id}"
    cursor.execute(query)

    order = cursor.fetchone()
    conn.close()
    return {"order": order}
Enter fullscreen mode Exit fullscreen mode

Snippet 18: Multiple Column Sort

import psycopg2

def search_inventory(search_term, sort_columns: list):
    conn = psycopg2.connect("dbname=warehouse")
    cursor = conn.cursor()

    allowed_sorts = ['name', 'quantity', 'price', 'updated_at']

    safe_sorts = [col for col in sort_columns if col in allowed_sorts]
    if not safe_sorts:
        safe_sorts = ['name']

    sort_str = ', '.join(safe_sorts)

    query = f"SELECT * FROM inventory WHERE name ILIKE %s ORDER BY {sort_str}"
    cursor.execute(query, (f"%{search_term}%",))

    results = cursor.fetchall()
    conn.close()
    return {"items": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 19: Pagination

import sqlite3

def get_posts(page, per_page):
    conn = sqlite3.connect('blog.db')
    cursor = conn.cursor()

    offset = (page - 1) * per_page

    query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
    cursor.execute(query)

    posts = cursor.fetchall()
    conn.close()
    return {"posts": posts}
Enter fullscreen mode Exit fullscreen mode

Snippet 20: Schema Browser

import psycopg2

def get_table_columns(table_name):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    query = f"""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = '{table_name}'
    """
    cursor.execute(query)

    columns = cursor.fetchall()
    conn.close()
    return {"columns": columns}
Enter fullscreen mode Exit fullscreen mode

Snippet 21: Aggregate Query

import mysql.connector

def get_sales_summary(group_by_col, year):
    conn = mysql.connector.connect(host="localhost", database="sales")
    cursor = conn.cursor()

    allowed_groups = ['product_id', 'category', 'region', 'salesperson']

    if group_by_col not in allowed_groups:
        group_by_col = 'category'

    if not isinstance(year, int) or year < 2000 or year > 2100:
        return {"error": "Invalid year"}

    query = f"""
        SELECT {group_by_col}, SUM(amount) as total
        FROM sales
        WHERE YEAR(sale_date) = {year}
        GROUP BY {group_by_col}
    """
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"summary": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 22: User Preferences Update

import sqlite3

def update_preference(user_id, pref_key, pref_value):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    allowed_prefs = ['theme', 'language', 'timezone', 'notifications']

    if pref_key not in allowed_prefs:
        return {"error": "Invalid preference"}

    query = f"UPDATE user_preferences SET {pref_key} = ? WHERE user_id = ?"
    cursor.execute(query, (pref_value, user_id))

    conn.commit()
    conn.close()
    return {"success": True}
Enter fullscreen mode Exit fullscreen mode

Snippet 23: Log Search with Date Range

import psycopg2

def search_logs(start_date, end_date, log_level):
    conn = psycopg2.connect("dbname=logging")
    cursor = conn.cursor()

    query = """
        SELECT timestamp, level, message 
        FROM logs 
        WHERE timestamp >= %s 
        AND timestamp <= %s
        AND level = '""" + log_level + "' ORDER BY timestamp DESC"

    cursor.execute(query, (start_date, end_date))

    logs = cursor.fetchall()
    conn.close()
    return {"logs": logs}
Enter fullscreen mode Exit fullscreen mode

Snippet 24: Dynamic IN Clause

import sqlite3

def get_users_by_role(roles: list):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    allowed_roles = ['admin', 'editor', 'viewer', 'guest']
    safe_roles = [r for r in roles if r in allowed_roles]

    if not safe_roles:
        return {"error": "No valid roles provided"}

    placeholders = ','.join(['?'] * len(safe_roles))
    query = f"SELECT id, username, role FROM users WHERE role IN ({placeholders})"
    cursor.execute(query, tuple(safe_roles))

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 25: Conditional Column Selection

import mysql.connector

def export_users(include_email, include_phone):
    conn = mysql.connector.connect(host="localhost", database="app")
    cursor = conn.cursor()

    columns = ['id', 'username']

    if include_email:
        columns.append('email')
    if include_phone:
        columns.append('phone')

    column_str = ', '.join(columns)
    query = f"SELECT {column_str} FROM users"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 26: Boolean Filter

import psycopg2

def get_active_items(category, is_active):
    conn = psycopg2.connect("dbname=inventory")
    cursor = conn.cursor()

    if not isinstance(is_active, bool):
        return {"error": "is_active must be boolean"}

    query = f"SELECT * FROM items WHERE category = %s AND active = {is_active}"
    cursor.execute(query, (category,))

    items = cursor.fetchall()
    conn.close()
    return {"items": items}
Enter fullscreen mode Exit fullscreen mode

Round 4: Complex Patterns (Snippets 27-38)

Difficulty: Advanced

Snippet 27: Search with Multiple Filters

import psycopg2

def advanced_search(filters: dict):
    """
    filters: {"name": "widget", "min_price": 10, "max_price": 100}
    """
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    allowed_filters = ['name', 'min_price', 'max_price', 'category', 'brand']

    where_clauses = []
    values = []

    for key, val in filters.items():
        if key not in allowed_filters:
            continue
        if key == 'name':
            where_clauses.append("name ILIKE %s")
            values.append(f"%{val}%")
        elif key == 'min_price':
            where_clauses.append("price >= %s")
            values.append(val)
        elif key == 'max_price':
            where_clauses.append("price <= %s")
            values.append(val)
        elif key in ('category', 'brand'):
            where_clauses.append(f"{key} = %s")
            values.append(val)

    query = "SELECT * FROM products"
    if where_clauses:
        query += " WHERE " + " AND ".join(where_clauses)

    cursor.execute(query, tuple(values))
    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 28: JSON Field Query

import sqlite3

def search_metadata(field_name, field_value):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
    cursor.execute(query, (field_value,))

    results = cursor.fetchall()
    conn.close()
    return {"documents": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 29: Bulk Status Update

import mysql.connector

def update_order_status(order_ids: list, new_status):
    conn = mysql.connector.connect(host="localhost", database="shop")
    cursor = conn.cursor()

    allowed_statuses = ['pending', 'processing', 'shipped', 'delivered', 'cancelled']

    if new_status not in allowed_statuses:
        return {"error": "Invalid status"}

    if not all(isinstance(oid, int) for oid in order_ids):
        return {"error": "Invalid order IDs"}

    placeholders = ','.join(['%s'] * len(order_ids))
    query = f"UPDATE orders SET status = %s WHERE id IN ({placeholders})"
    cursor.execute(query, (new_status, *order_ids))

    conn.commit()
    conn.close()
    return {"updated": cursor.rowcount}
Enter fullscreen mode Exit fullscreen mode

Snippet 30: Column Alias

import psycopg2

def get_report(value_column, label):
    conn = psycopg2.connect("dbname=reports")
    cursor = conn.cursor()

    allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']

    if value_column not in allowed_columns:
        return {"error": "Invalid column"}

    query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"report": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 31: SQLAlchemy Text Query

from sqlalchemy import create_engine, text

def find_user_by_field(field, value):
    engine = create_engine('postgresql://localhost/app')

    allowed_fields = ['username', 'email', 'phone']

    if field not in allowed_fields:
        return {"error": "Invalid field"}

    with engine.connect() as conn:
        query = text(f"SELECT * FROM users WHERE {field} = :value")
        result = conn.execute(query, {"value": value})
        user = result.fetchone()

    return {"user": user}
Enter fullscreen mode Exit fullscreen mode

Snippet 32: Date Range with Regex

import sqlite3
import re

def get_events(start_date, end_date):
    conn = sqlite3.connect('events.db')
    cursor = conn.cursor()

    date_pattern = r'^\d{4}-\d{2}-\d{2}$'

    if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
        return {"error": "Invalid date format. Use YYYY-MM-DD"}

    query = f"SELECT * FROM events WHERE event_date BETWEEN '{start_date}' AND '{end_date}'"
    cursor.execute(query)

    events = cursor.fetchall()
    conn.close()
    return {"events": events}
Enter fullscreen mode Exit fullscreen mode

Snippet 33: Subquery Filter

import psycopg2

def get_top_customers(min_orders):
    conn = psycopg2.connect("dbname=shop")
    cursor = conn.cursor()

    try:
        min_orders = int(min_orders)
    except (ValueError, TypeError):
        return {"error": "Invalid minimum orders value"}

    query = """
        SELECT c.id, c.name, c.email
        FROM customers c
        WHERE (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.id) >= %s
    """
    cursor.execute(query, (min_orders,))

    customers = cursor.fetchall()
    conn.close()
    return {"customers": customers}
Enter fullscreen mode Exit fullscreen mode

Snippet 34: LIKE with Match Types

import mysql.connector

def search_products(search_term, match_type):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    if match_type == 'starts_with':
        pattern = f"{search_term}%"
    elif match_type == 'ends_with':
        pattern = f"%{search_term}"
    elif match_type == 'contains':
        pattern = f"%{search_term}%"
    else:
        pattern = search_term

    query = "SELECT * FROM products WHERE name LIKE %s"
    cursor.execute(query, (pattern,))

    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 35: Django ORM

from django.contrib.auth.models import User

def search_users(search_term, order_by):
    allowed_ordering = ['username', 'email', 'date_joined', '-username', '-email', '-date_joined']

    if order_by not in allowed_ordering:
        order_by = 'username'

    users = User.objects.filter(
        username__icontains=search_term
    ).order_by(order_by)

    return list(users.values('id', 'username', 'email'))
Enter fullscreen mode Exit fullscreen mode

Snippet 36: Table Join

import sqlite3

def get_order_details(order_id, include_customer):
    conn = sqlite3.connect('shop.db')
    cursor = conn.cursor()

    if include_customer:
        query = """
            SELECT o.*, c.name, c.email 
            FROM orders o 
            JOIN customers c ON o.customer_id = c.id 
            WHERE o.id = ?
        """
    else:
        query = "SELECT * FROM orders WHERE id = ?"

    cursor.execute(query, (order_id,))

    order = cursor.fetchone()
    conn.close()
    return {"order": order}
Enter fullscreen mode Exit fullscreen mode

Snippet 37: Regex Column Validation

import psycopg2
import re

def get_column_stats(column_name):
    conn = psycopg2.connect("dbname=analytics")
    cursor = conn.cursor()

    if not re.match(r'^[a-z_]+$', column_name):
        return {"error": "Invalid column name"}

    query = f"SELECT MIN({column_name}), MAX({column_name}), AVG({column_name}) FROM metrics"
    cursor.execute(query)

    stats = cursor.fetchone()
    conn.close()
    return {"min": stats[0], "max": stats[1], "avg": stats[2]}
Enter fullscreen mode Exit fullscreen mode

Snippet 38: Cursor Pagination

import psycopg2

def get_paginated_items(cursor_id, limit, direction):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    if not isinstance(cursor_id, int):
        cursor_id = 0

    if not isinstance(limit, int) or limit < 1 or limit > 100:
        limit = 20

    if direction == 'next':
        operator = '>'
        order = 'ASC'
    elif direction == 'prev':
        operator = '<'
        order = 'DESC'
    else:
        return {"error": "Invalid direction"}

    query = f"SELECT * FROM items WHERE id {operator} %s ORDER BY id {order} LIMIT %s"
    cursor.execute(query, (cursor_id, limit))

    items = cursor.fetchall()
    conn.close()
    return {"items": items}
Enter fullscreen mode Exit fullscreen mode

Round 5: Expert Patterns (Snippets 39-52)

Difficulty: Expert

Snippet 39: UUID Validation

import sqlite3
import re

def get_session(session_id):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    uuid_pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'

    if not re.match(uuid_pattern, session_id):
        return {"error": "Invalid session ID"}

    query = f"SELECT * FROM sessions WHERE id = '{session_id}'"
    cursor.execute(query)

    session = cursor.fetchone()
    conn.close()
    return {"session": session}
Enter fullscreen mode Exit fullscreen mode

Snippet 40: PostgreSQL JSON Operator

import psycopg2

def search_by_tag(tag_name):
    conn = psycopg2.connect("dbname=content")
    cursor = conn.cursor()

    query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"articles": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 41: Numeric String Check

import mysql.connector

def get_product(product_id):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    if not str(product_id).isnumeric():
        return {"error": "Invalid product ID"}

    query = f"SELECT * FROM products WHERE id = {product_id}"
    cursor.execute(query)

    product = cursor.fetchone()
    conn.close()
    return {"product": product}
Enter fullscreen mode Exit fullscreen mode

Snippet 42: Window Function

import psycopg2

def get_ranked_sales(partition_col):
    conn = psycopg2.connect("dbname=sales")
    cursor = conn.cursor()

    allowed_partitions = ['region', 'category', 'salesperson', 'quarter']

    if partition_col not in allowed_partitions:
        return {"error": "Invalid partition column"}

    query = f"""
        SELECT *, RANK() OVER (PARTITION BY {partition_col} ORDER BY amount DESC) as rank
        FROM sales
    """
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"sales": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 43: COALESCE Function

import sqlite3

def get_user_display_name(user_id, default_name):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
    cursor.execute(query, (user_id,))

    result = cursor.fetchone()
    conn.close()
    return {"display_name": result[0] if result else None}
Enter fullscreen mode Exit fullscreen mode

Snippet 44: Offset Pagination with Validation

import psycopg2

def get_comments(post_id, page):
    conn = psycopg2.connect("dbname=blog")
    cursor = conn.cursor()

    try:
        page = int(page)
        if page < 1:
            page = 1
    except (ValueError, TypeError):
        page = 1

    offset = (page - 1) * 20

    query = f"SELECT * FROM comments WHERE post_id = %s ORDER BY created_at LIMIT 20 OFFSET {offset}"
    cursor.execute(query, (post_id,))

    comments = cursor.fetchall()
    conn.close()
    return {"comments": comments}
Enter fullscreen mode Exit fullscreen mode

Snippet 45: CAST Function

import mysql.connector

def search_by_year(year_input):
    conn = mysql.connector.connect(host="localhost", database="archive")
    cursor = conn.cursor()

    query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"documents": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 46: PostgreSQL ANY Operator

import psycopg2

def find_users_with_role(role):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    allowed_roles = ['admin', 'editor', 'viewer', 'moderator']

    if role not in allowed_roles:
        return {"error": "Invalid role"}

    query = f"SELECT * FROM users WHERE '{role}' = ANY(roles)"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 47: INTERVAL Expression

import psycopg2

def get_recent_activity(hours_ago):
    conn = psycopg2.connect("dbname=activity")
    cursor = conn.cursor()

    if not isinstance(hours_ago, int) or hours_ago < 1 or hours_ago > 168:
        hours_ago = 24

    query = f"SELECT * FROM activity_log WHERE timestamp > NOW() - INTERVAL '{hours_ago} hours'"
    cursor.execute(query)

    activities = cursor.fetchall()
    conn.close()
    return {"activities": activities}
Enter fullscreen mode Exit fullscreen mode

Snippet 48: Simple Parameterization

import sqlite3

def search_full_name(first_name, last_name):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = "SELECT * FROM users WHERE first_name = ? AND last_name = ?"
    cursor.execute(query, (first_name, last_name))

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 49: Table Function with isidentifier

import psycopg2

def get_table_size(table_name):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    if not table_name.isidentifier():
        return {"error": "Invalid table name"}

    query = f"SELECT pg_size_pretty(pg_total_relation_size('{table_name}'))"
    cursor.execute(query)

    size = cursor.fetchone()
    conn.close()
    return {"size": size[0]}
Enter fullscreen mode Exit fullscreen mode

Snippet 50: Float Validation

import mysql.connector

def get_products_under_price(max_price):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    try:
        max_price = float(max_price)
    except (ValueError, TypeError):
        return {"error": "Invalid price"}

    query = f"SELECT * FROM products WHERE price <= {max_price}"
    cursor.execute(query)

    products = cursor.fetchall()
    conn.close()
    return {"products": products}
Enter fullscreen mode Exit fullscreen mode

Snippet 51: EXISTS Subquery

import sqlite3

def get_authors_with_posts(min_posts):
    conn = sqlite3.connect('blog.db')
    cursor = conn.cursor()

    try:
        min_posts = int(min_posts)
    except ValueError:
        return {"error": "Invalid minimum posts"}

    query = """
        SELECT * FROM authors a
        WHERE EXISTS (
            SELECT 1 FROM posts p 
            WHERE p.author_id = a.id 
            GROUP BY p.author_id 
            HAVING COUNT(*) >= ?
        )
    """
    cursor.execute(query, (min_posts,))

    authors = cursor.fetchall()
    conn.close()
    return {"authors": authors}
Enter fullscreen mode Exit fullscreen mode

Snippet 52: Schema-Qualified Table

import psycopg2

def get_data(schema_name, limit):
    conn = psycopg2.connect("dbname=warehouse")
    cursor = conn.cursor()

    allowed_schemas = ['public', 'staging', 'archive']

    if schema_name not in allowed_schemas:
        return {"error": "Invalid schema"}

    if not isinstance(limit, int) or limit < 1:
        limit = 100

    query = f"SELECT * FROM {schema_name}.reports ORDER BY created_at DESC LIMIT {limit}"
    cursor.execute(query)

    data = cursor.fetchall()
    conn.close()
    return {"data": data}
Enter fullscreen mode Exit fullscreen mode

πŸ“ Answer Key

Click to reveal answers

Round 1 Answers

Snippet Vulnerable? Reason
1 βœ… Yes f-string interpolation of username and password
2 ❌ No Properly parameterized with %s
3 βœ… Yes String concatenation of email
4 ❌ No SQLAlchemy :param parameterization
5 βœ… Yes date_filter interpolated via f-string (table is allowlisted)

Snippet 1 Fix

# Vulnerable
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)

# Fixed
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
Enter fullscreen mode Exit fullscreen mode

Snippet 3 Fix

# Vulnerable
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)

# Fixed
query = "SELECT 1 FROM users WHERE email = %s LIMIT 1"
cursor.execute(query, (email,))
Enter fullscreen mode Exit fullscreen mode

Snippet 5 Fix

# Vulnerable
query = f"SELECT * FROM {table_name} WHERE created_at &gt; '{date_filter}'"
cursor.execute(query)

# Fixed (table_name already allowlisted, just parameterize date_filter)
query = f"SELECT * FROM {table_name} WHERE created_at &gt; ?"
cursor.execute(query, (date_filter,))
Enter fullscreen mode Exit fullscreen mode

Round 2 Answers

Snippet Vulnerable? Reason
6 ❌ No .isdigit() prevents non-numeric input
7 ❌ No sort_by allowlisted, query_param parameterized
8 ❌ No Placeholders generated safely, values parameterized
9 βœ… Yes remember_token branch uses f-string
10 ❌ No Django parameterization with %s
11 βœ… Yes limit concatenated via str()
12 ❌ No All queries use ? parameterization
13 ❌ No All identifiers allowlisted, values parameterized
14 βœ… Yes reset_password function uses f-string for token

Snippet 9 Fix

# Vulnerable
if remember_token:
    query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
    cursor.execute(query)

# Fixed
if remember_token:
    query = "SELECT * FROM users WHERE remember_token = ?"
    cursor.execute(query, (remember_token,))
Enter fullscreen mode Exit fullscreen mode

Snippet 11 Fix

# Vulnerable
query = """
    SELECT timestamp, user_id, action, details 
    FROM audit_logs 
    WHERE action = %s 
    AND timestamp BETWEEN %s AND %s
    ORDER BY timestamp DESC
    LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))

# Fixed
query = """
    SELECT timestamp, user_id, action, details 
    FROM audit_logs 
    WHERE action = %s 
    AND timestamp BETWEEN %s AND %s
    ORDER BY timestamp DESC
    LIMIT %s
"""
cursor.execute(query, (action_type, start_date, end_date, limit))
Enter fullscreen mode Exit fullscreen mode

Snippet 14 Fix (reset_password function)

# Vulnerable
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires &gt; datetime('now')"
cursor.execute(query)

# Fixed
query = "SELECT id FROM users WHERE reset_token = ? AND reset_expires &gt; datetime('now')"
cursor.execute(query, (token,))
Enter fullscreen mode Exit fullscreen mode

Round 3 Answers

Snippet Vulnerable? Reason
15 ❌ No column allowlisted, value parameterized
16 βœ… Yes sort_dir not validated
17 ❌ No int() conversion prevents injection
18 ❌ No Columns filtered against allowlist
19 βœ… Yes page and per_page not validated
20 βœ… Yes table_name interpolated without validation
21 ❌ No group_by_col allowlisted, year validated with isinstance(int)
22 ❌ No pref_key allowlisted, values parameterized
23 βœ… Yes log_level concatenated
24 ❌ No Roles filtered against allowlist, then parameterized
25 ❌ No Boolean flags control hardcoded columns
26 ❌ No isinstance(bool) only allows True/False

Snippet 16 Fix

# Vulnerable
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)

# Fixed (allowlist validation β€” cannot parameterize ASC/DESC)
allowed_directions = ['ASC', 'DESC']
if sort_dir.upper() not in allowed_directions:
    sort_dir = 'ASC'

query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir.upper()}"
cursor.execute(query)
Enter fullscreen mode Exit fullscreen mode

Snippet 19 Fix

# Vulnerable
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)

# Fixed (add validation + parameterization)
try:
    page = int(page)
    per_page = int(per_page)
    if page &lt; 1:
        page = 1
    if per_page &lt; 1 or per_page &gt; 100:
        per_page = 20
except (ValueError, TypeError):
    page, per_page = 1, 20

offset = (page - 1) * per_page
query = "SELECT * FROM posts ORDER BY created_at DESC LIMIT ? OFFSET ?"
cursor.execute(query, (per_page, offset))
Enter fullscreen mode Exit fullscreen mode

Snippet 20 Fix

# Vulnerable
query = f"""
    SELECT column_name, data_type 
    FROM information_schema.columns 
    WHERE table_name = '{table_name}'
"""
cursor.execute(query)

# Fixed
query = """
    SELECT column_name, data_type 
    FROM information_schema.columns 
    WHERE table_name = %s
"""
cursor.execute(query, (table_name,))
Enter fullscreen mode Exit fullscreen mode

Snippet 23 Fix

# Vulnerable
query = """
    SELECT timestamp, level, message 
    FROM logs 
    WHERE timestamp &gt;= %s 
    AND timestamp &lt;= %s
    AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))

# Fixed
query = """
    SELECT timestamp, level, message 
    FROM logs 
    WHERE timestamp &gt;= %s 
    AND timestamp &lt;= %s
    AND level = %s
    ORDER BY timestamp DESC
"""
cursor.execute(query, (start_date, end_date, log_level))
Enter fullscreen mode Exit fullscreen mode

Round 4 Answers

Snippet Vulnerable? Reason
27 ❌ No Keys allowlisted, values parameterized
28 βœ… Yes field_name in JSON path not validated
29 ❌ No Status allowlisted, IDs type-checked
30 βœ… Yes label (alias) not validated
31 ❌ No field allowlisted, value parameterized
32 ❌ No Strict regex allows only YYYY-MM-DD
33 ❌ No int() + parameterization
34 ❌ No Pattern built safely, parameterized
35 ❌ No Django ORM + allowlisted ordering
36 ❌ No Boolean selects hardcoded queries
37 ❌ No Strict regex ^[a-z_]+$
38 ❌ No Type checks + hardcoded operators

Snippet 28 Fix

# Vulnerable
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))

# Fixed (allowlist validation β€” cannot parameterize JSON paths)
allowed_fields = ['author', 'category', 'status', 'priority']
if field_name not in allowed_fields:
    return {"error": "Invalid field name"}

query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
Enter fullscreen mode Exit fullscreen mode

Snippet 30 Fix

# Vulnerable
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)

# Fixed (allowlist validation β€” cannot parameterize aliases)
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
allowed_labels = ['total', 'amount', 'value', 'metric', 'result']

if value_column not in allowed_columns:
    return {"error": "Invalid column"}
if label not in allowed_labels:
    return {"error": "Invalid label"}

query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
Enter fullscreen mode Exit fullscreen mode

Round 5 Answers

Snippet Vulnerable? Reason
39 ❌ No UUID regex only allows hex + hyphens
40 βœ… Yes tag_name in ILIKE not parameterized
41 ❌ No .isnumeric() prevents non-numeric
42 ❌ No partition_col allowlisted
43 βœ… Yes default_name in COALESCE not parameterized
44 ❌ No int() with error handling
45 βœ… Yes year_input in CAST not parameterized
46 ❌ No role allowlisted
47 ❌ No isinstance(int) + range check
48 ❌ No Proper parameterization
49 ❌ No .isidentifier() allows only valid identifiers
50 ❌ No float() with error handling
51 ❌ No int() + parameterization
52 ❌ No Schema allowlisted, limit type-checked

Snippet 40 Fix

# Vulnerable
query = f"SELECT * FROM articles WHERE metadata-&gt;&gt;'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)

# Fixed (parameterize with wildcards in parameter value)
query = "SELECT * FROM articles WHERE metadata-&gt;&gt;'tags' ILIKE %s"
cursor.execute(query, (f"%{tag_name}%",))
Enter fullscreen mode Exit fullscreen mode

Snippet 43 Fix

# Vulnerable
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))

# Fixed (parameterize both values)
query = "SELECT COALESCE(display_name, ?) FROM users WHERE id = ?"
cursor.execute(query, (default_name, user_id))
Enter fullscreen mode Exit fullscreen mode

Snippet 45 Fix

# Vulnerable
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)

# Fixed
query = "SELECT * FROM documents WHERE CAST(created_year AS CHAR) = %s"
cursor.execute(query, (year_input,))
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Score Yourself

Score Level
0-20 Beginner β€” Review parameterization basics
21-35 Intermediate β€” Practice identifier vs value distinction
36-45 Advanced β€” Focus on subtle patterns
46-52 Expert β€” Ready for production code review!

πŸš€ Want More Exercises Like These?

I'm building an open-source repository of LeetCode-style secure coding exercises to help train the next generation of Security Engineers β€” and to curate high-quality secure code datasets for AI training.

⭐ Star the repo: github.com/fosres/SecEng-Exercises

What's inside:

  • πŸ” SQL Injection exercises (like these!)
  • πŸ›‘οΈ XSS prevention patterns
  • πŸ”‘ Authentication security
  • πŸ“‘ API security challenges
  • πŸ§ͺ 60+ test cases per exercise

Contributing

Found a bug? Want to add exercises? PRs welcome!


πŸ“š References

These exercises were inspired by real-world patterns documented in:

  • Full Stack Python Security by Dennis Byrne (Manning, 2021) β€” pp. 205-207
  • Secure by Design by Johnsson, Deogun, Sawano (Manning, 2019) β€” Ch. 5
  • API Security in Action by Neil Madden (Manning, 2020) β€” Ch. 2
  • Hacking APIs by Corey Ball (No Starch Press, 2022) β€” Ch. 9
  • OWASP SQL Injection Prevention Cheat Sheet β€” owasp.org
  • PortSwigger Web Security Academy β€” portswigger.net

🏷️ Tags

#security #python #sql #webdev #beginners #tutorial #opensource #appsec


Did these exercises help you? Drop a comment with your score! And don't forget to ⭐ the GitHub repo for more security content.

Top comments (0)