DEV Community

fosres
fosres

Posted on

Week 4 SQL Injection Audit Challenge

66 SQL Injection Code Audit Exercises: From Beginner to Expert πŸ”

SQL Injection Security

Master SQL injection detection through hands-on code review β€” no exploit development required.

Whether you're preparing for a Security Engineering interview, studying for the OSWE, or building AppSec skills, these exercises will train your eye to spot vulnerable code patterns across Python's most popular database libraries.


🎯 What You'll Learn

  • Identify SQL injection vulnerabilities in sqlite3, psycopg2, mysql.connector, SQLAlchemy, and Django ORM
  • Recognize when input validation IS sufficient vs. when parameterization is required
  • Understand the difference between identifiers (column/table names) and values
  • Write secure fixes using proper parameterization patterns

πŸ“‹ Prerequisites

  • Basic Python knowledge
  • Basic SQL syntax understanding
  • Familiarity with web application concepts

πŸ† Challenge Format

For each snippet, determine:

  1. Vulnerable? (Yes/No)
  2. If yes, write the fix

Scoring Guide:

  • Correct identification: 1 point
  • Correct fix (if vulnerable): 1 point

πŸ”‘ Key Concepts Before You Start

Identifiers vs Values

Type Examples Can Parameterize? Protection
Identifier Column names, table names, aliases, ASC/DESC ❌ No Allowlist validation
Value Data in WHERE, LIMIT, function arguments βœ… Yes Parameterization

Placeholder Syntax by Library

Library Placeholder Example
sqlite3 ? cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
psycopg2 %s cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
mysql.connector %s cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
SQLAlchemy :param conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id})

Round 1: Fundamentals (Snippets 1-5)

Difficulty: Beginner

Snippet 1: Basic Authentication

import sqlite3

def authenticate_user(username, password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
    cursor.execute(query)

    user = cursor.fetchone()
    conn.close()

    if user:
        return {"status": "success", "user_id": user[0]}
    else:
        return {"status": "failed"}
Enter fullscreen mode Exit fullscreen mode

Snippet 2: Product Search

import psycopg2

def search_products(search_term):
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    query = "SELECT name, price FROM products WHERE name ILIKE %s"
    cursor.execute(query, (f"%{search_term}%",))

    results = cursor.fetchall()
    conn.close()
    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 3: Account Check

import mysql.connector

def check_account_exists(email):
    conn = mysql.connector.connect(host="localhost", database="accounts")
    cursor = conn.cursor()

    query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
    cursor.execute(query)

    exists = cursor.fetchone() is not None
    conn.close()

    return {"exists": exists}
Enter fullscreen mode Exit fullscreen mode

Snippet 4: Order Status Lookup

from sqlalchemy import create_engine, text

def get_order_status(order_id):
    engine = create_engine('postgresql://localhost/orders')

    with engine.connect() as conn:
        query = text("SELECT status FROM orders WHERE id = :order_id")
        result = conn.execute(query, {"order_id": order_id})

        row = result.fetchone()
        if row:
            return {"status": row[0]}
        return {"status": "not_found"}
Enter fullscreen mode Exit fullscreen mode

Snippet 5: Report Generator

import sqlite3

def generate_report(table_name, date_filter):
    conn = sqlite3.connect('reports.db')
    cursor = conn.cursor()

    allowed_tables = ['sales', 'inventory', 'customers']

    if table_name not in allowed_tables:
        return {"error": "Invalid table"}

    query = f"SELECT * FROM {table_name} WHERE created_at > '{date_filter}'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"data": results}
Enter fullscreen mode Exit fullscreen mode

Round 2: Mixed Patterns (Snippets 6-14)

Difficulty: Intermediate

Snippet 6: User Profile with ID Check

import sqlite3

def get_user_profile(user_id):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    if not user_id.isdigit():
        return {"error": "Invalid user ID"}

    query = f"SELECT username, email, bio FROM users WHERE id = {user_id}"
    cursor.execute(query)

    profile = cursor.fetchone()
    conn.close()

    if profile:
        return {"username": profile[0], "email": profile[1], "bio": profile[2]}
    return {"error": "User not found"}
Enter fullscreen mode Exit fullscreen mode

Snippet 7: Flask Search with Sort

from flask import Flask, request
import psycopg2

app = Flask(__name__)

@app.route('/search')
def search():
    query_param = request.args.get('q', '')
    sort_by = request.args.get('sort', 'name')

    conn = psycopg2.connect("dbname=products")
    cursor = conn.cursor()

    allowed_sort = ['name', 'price', 'date_added']
    if sort_by not in allowed_sort:
        sort_by = 'name'

    query = f"SELECT * FROM products WHERE name ILIKE %s ORDER BY {sort_by}"
    cursor.execute(query, (f"%{query_param}%",))

    results = cursor.fetchall()
    conn.close()
    return {"results": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 8: Bulk Delete

import mysql.connector

def delete_users(user_ids: list):
    conn = mysql.connector.connect(host="localhost", database="app")
    cursor = conn.cursor()

    placeholders = ','.join(['%s'] * len(user_ids))
    query = f"DELETE FROM users WHERE id IN ({placeholders})"
    cursor.execute(query, tuple(user_ids))

    conn.commit()
    deleted_count = cursor.rowcount
    conn.close()

    return {"deleted": deleted_count}
Enter fullscreen mode Exit fullscreen mode

Snippet 9: Login with Remember Token

import sqlite3
import hashlib

def login(username, password, remember_token=None):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    if remember_token:
        query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
        cursor.execute(query)
    else:
        password_hash = hashlib.sha256(password.encode()).hexdigest()
        query = "SELECT * FROM users WHERE username = ? AND password_hash = ?"
        cursor.execute(query, (username, password_hash))

    user = cursor.fetchone()
    conn.close()

    if user:
        return {"status": "success", "user_id": user[0]}
    return {"status": "failed"}
Enter fullscreen mode Exit fullscreen mode

Snippet 10: Django Connection Query

from django.db import connection

def get_orders_by_status(status, customer_id):
    with connection.cursor() as cursor:
        cursor.execute(
            "SELECT * FROM orders WHERE status = %s AND customer_id = %s",
            [status, customer_id]
        )
        rows = cursor.fetchall()

    return [{"id": row[0], "total": row[1], "status": row[2]} for row in rows]
Enter fullscreen mode Exit fullscreen mode

Snippet 11: Audit Log Search

import psycopg2
from datetime import datetime

def search_audit_logs(action_type, start_date, end_date, limit=100):
    conn = psycopg2.connect("dbname=audit")
    cursor = conn.cursor()

    query = """
        SELECT timestamp, user_id, action, details 
        FROM audit_logs 
        WHERE action = %s 
        AND timestamp BETWEEN %s AND %s
        ORDER BY timestamp DESC
        LIMIT """ + str(limit)

    cursor.execute(query, (action_type, start_date, end_date))

    logs = cursor.fetchall()
    conn.close()
    return {"logs": logs}
Enter fullscreen mode Exit fullscreen mode

Snippet 12: User Registration

import sqlite3
import hashlib

def register_user(username, email, password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    cursor.execute("SELECT 1 FROM users WHERE username = ?", (username,))
    if cursor.fetchone():
        conn.close()
        return {"error": "Username taken"}

    cursor.execute("SELECT 1 FROM users WHERE email = ?", (email,))
    if cursor.fetchone():
        conn.close()
        return {"error": "Email already registered"}

    password_hash = hashlib.sha256(password.encode()).hexdigest()
    cursor.execute(
        "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
        (username, email, password_hash)
    )

    conn.commit()
    user_id = cursor.lastrowid
    conn.close()

    return {"success": True, "user_id": user_id}
Enter fullscreen mode Exit fullscreen mode

Snippet 13: Dynamic Report Builder

import psycopg2

def build_report(table, columns, filters):
    """
    table: string - table name
    columns: list - columns to select
    filters: dict - {column: value} pairs for WHERE clause
    """
    conn = psycopg2.connect("dbname=reports")
    cursor = conn.cursor()

    allowed_tables = ['sales', 'inventory', 'employees']
    allowed_columns = ['id', 'name', 'amount', 'date', 'department']

    if table not in allowed_tables:
        return {"error": "Invalid table"}

    safe_columns = [c for c in columns if c in allowed_columns]
    if not safe_columns:
        return {"error": "No valid columns"}

    column_str = ', '.join(safe_columns)

    where_clauses = []
    values = []
    for col, val in filters.items():
        if col in allowed_columns:
            where_clauses.append(f"{col} = %s")
            values.append(val)

    query = f"SELECT {column_str} FROM {table}"
    if where_clauses:
        query += " WHERE " + " AND ".join(where_clauses)

    cursor.execute(query, tuple(values))
    results = cursor.fetchall()
    conn.close()

    return {"data": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 14: Password Reset

import sqlite3
import secrets

def request_password_reset(email):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    cursor.execute("SELECT id FROM users WHERE email = ?", (email,))
    user = cursor.fetchone()

    if not user:
        conn.close()
        return {"message": "If email exists, reset link sent"}

    reset_token = secrets.token_urlsafe(32)

    cursor.execute(
        "UPDATE users SET reset_token = ?, reset_expires = datetime('now', '+1 hour') WHERE id = ?",
        (reset_token, user[0])
    )
    conn.commit()
    conn.close()

    return {"message": "If email exists, reset link sent"}


def reset_password(token, new_password):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires > datetime('now')"
    cursor.execute(query)
    user = cursor.fetchone()

    if not user:
        conn.close()
        return {"error": "Invalid or expired token"}

    cursor.execute(
        "UPDATE users SET password_hash = ?, reset_token = NULL WHERE id = ?",
        (new_password, user[0])
    )
    conn.commit()
    conn.close()

    return {"success": True}
Enter fullscreen mode Exit fullscreen mode

Round 3: Identifier Patterns (Snippets 15-26)

Difficulty: Intermediate-Advanced

Snippet 15: Dynamic Column Filter

import psycopg2

def filter_products(column, value):
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    allowed_columns = ['category', 'brand', 'status']

    if column not in allowed_columns:
        return {"error": "Invalid filter column"}

    query = f"SELECT * FROM products WHERE {column} = %s"
    cursor.execute(query, (value,))

    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 16: Sort Direction

import sqlite3

def get_users_sorted(sort_dir):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 17: Numeric ID Check

import mysql.connector

def get_order(order_id):
    conn = mysql.connector.connect(host="localhost", database="shop")
    cursor = conn.cursor()

    try:
        order_id = int(order_id)
    except ValueError:
        return {"error": "Invalid order ID"}

    query = f"SELECT * FROM orders WHERE id = {order_id}"
    cursor.execute(query)

    order = cursor.fetchone()
    conn.close()
    return {"order": order}
Enter fullscreen mode Exit fullscreen mode

Snippet 18: Multiple Column Sort

import psycopg2

def search_inventory(search_term, sort_columns: list):
    conn = psycopg2.connect("dbname=warehouse")
    cursor = conn.cursor()

    allowed_sorts = ['name', 'quantity', 'price', 'updated_at']

    safe_sorts = [col for col in sort_columns if col in allowed_sorts]
    if not safe_sorts:
        safe_sorts = ['name']

    sort_str = ', '.join(safe_sorts)

    query = f"SELECT * FROM inventory WHERE name ILIKE %s ORDER BY {sort_str}"
    cursor.execute(query, (f"%{search_term}%",))

    results = cursor.fetchall()
    conn.close()
    return {"items": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 19: Pagination

import sqlite3

def get_posts(page, per_page):
    conn = sqlite3.connect('blog.db')
    cursor = conn.cursor()

    offset = (page - 1) * per_page

    query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
    cursor.execute(query)

    posts = cursor.fetchall()
    conn.close()
    return {"posts": posts}
Enter fullscreen mode Exit fullscreen mode

Snippet 20: Schema Browser

import psycopg2

def get_table_columns(table_name):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    query = f"""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = '{table_name}'
    """
    cursor.execute(query)

    columns = cursor.fetchall()
    conn.close()
    return {"columns": columns}
Enter fullscreen mode Exit fullscreen mode

Snippet 21: Aggregate Query

import mysql.connector

def get_sales_summary(group_by_col, year):
    conn = mysql.connector.connect(host="localhost", database="sales")
    cursor = conn.cursor()

    allowed_groups = ['product_id', 'category', 'region', 'salesperson']

    if group_by_col not in allowed_groups:
        group_by_col = 'category'

    if not isinstance(year, int) or year < 2000 or year > 2100:
        return {"error": "Invalid year"}

    query = f"""
        SELECT {group_by_col}, SUM(amount) as total
        FROM sales
        WHERE YEAR(sale_date) = {year}
        GROUP BY {group_by_col}
    """
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"summary": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 22: User Preferences Update

import sqlite3

def update_preference(user_id, pref_key, pref_value):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    allowed_prefs = ['theme', 'language', 'timezone', 'notifications']

    if pref_key not in allowed_prefs:
        return {"error": "Invalid preference"}

    query = f"UPDATE user_preferences SET {pref_key} = ? WHERE user_id = ?"
    cursor.execute(query, (pref_value, user_id))

    conn.commit()
    conn.close()
    return {"success": True}
Enter fullscreen mode Exit fullscreen mode

Snippet 23: Log Search with Date Range

import psycopg2

def search_logs(start_date, end_date, log_level):
    conn = psycopg2.connect("dbname=logging")
    cursor = conn.cursor()

    query = """
        SELECT timestamp, level, message 
        FROM logs 
        WHERE timestamp >= %s 
        AND timestamp <= %s
        AND level = '""" + log_level + "' ORDER BY timestamp DESC"

    cursor.execute(query, (start_date, end_date))

    logs = cursor.fetchall()
    conn.close()
    return {"logs": logs}
Enter fullscreen mode Exit fullscreen mode

Snippet 24: Dynamic IN Clause

import sqlite3

def get_users_by_role(roles: list):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    allowed_roles = ['admin', 'editor', 'viewer', 'guest']
    safe_roles = [r for r in roles if r in allowed_roles]

    if not safe_roles:
        return {"error": "No valid roles provided"}

    placeholders = ','.join(['?'] * len(safe_roles))
    query = f"SELECT id, username, role FROM users WHERE role IN ({placeholders})"
    cursor.execute(query, tuple(safe_roles))

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 25: Conditional Column Selection

import mysql.connector

def export_users(include_email, include_phone):
    conn = mysql.connector.connect(host="localhost", database="app")
    cursor = conn.cursor()

    columns = ['id', 'username']

    if include_email:
        columns.append('email')
    if include_phone:
        columns.append('phone')

    column_str = ', '.join(columns)
    query = f"SELECT {column_str} FROM users"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 26: Boolean Filter

import psycopg2

def get_active_items(category, is_active):
    conn = psycopg2.connect("dbname=inventory")
    cursor = conn.cursor()

    if not isinstance(is_active, bool):
        return {"error": "is_active must be boolean"}

    query = f"SELECT * FROM items WHERE category = %s AND active = {is_active}"
    cursor.execute(query, (category,))

    items = cursor.fetchall()
    conn.close()
    return {"items": items}
Enter fullscreen mode Exit fullscreen mode

Round 4: Complex Patterns (Snippets 27-38)

Difficulty: Advanced

Snippet 27: Search with Multiple Filters

import psycopg2

def advanced_search(filters: dict):
    """
    filters: {"name": "widget", "min_price": 10, "max_price": 100}
    """
    conn = psycopg2.connect("dbname=store")
    cursor = conn.cursor()

    allowed_filters = ['name', 'min_price', 'max_price', 'category', 'brand']

    where_clauses = []
    values = []

    for key, val in filters.items():
        if key not in allowed_filters:
            continue
        if key == 'name':
            where_clauses.append("name ILIKE %s")
            values.append(f"%{val}%")
        elif key == 'min_price':
            where_clauses.append("price >= %s")
            values.append(val)
        elif key == 'max_price':
            where_clauses.append("price <= %s")
            values.append(val)
        elif key in ('category', 'brand'):
            where_clauses.append(f"{key} = %s")
            values.append(val)

    query = "SELECT * FROM products"
    if where_clauses:
        query += " WHERE " + " AND ".join(where_clauses)

    cursor.execute(query, tuple(values))
    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 28: JSON Field Query

import sqlite3

def search_metadata(field_name, field_value):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
    cursor.execute(query, (field_value,))

    results = cursor.fetchall()
    conn.close()
    return {"documents": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 29: Bulk Status Update

import mysql.connector

def update_order_status(order_ids: list, new_status):
    conn = mysql.connector.connect(host="localhost", database="shop")
    cursor = conn.cursor()

    allowed_statuses = ['pending', 'processing', 'shipped', 'delivered', 'cancelled']

    if new_status not in allowed_statuses:
        return {"error": "Invalid status"}

    if not all(isinstance(oid, int) for oid in order_ids):
        return {"error": "Invalid order IDs"}

    placeholders = ','.join(['%s'] * len(order_ids))
    query = f"UPDATE orders SET status = %s WHERE id IN ({placeholders})"
    cursor.execute(query, (new_status, *order_ids))

    conn.commit()
    conn.close()
    return {"updated": cursor.rowcount}
Enter fullscreen mode Exit fullscreen mode

Snippet 30: Column Alias

import psycopg2

def get_report(value_column, label):
    conn = psycopg2.connect("dbname=reports")
    cursor = conn.cursor()

    allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']

    if value_column not in allowed_columns:
        return {"error": "Invalid column"}

    query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"report": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 31: SQLAlchemy Text Query

from sqlalchemy import create_engine, text

def find_user_by_field(field, value):
    engine = create_engine('postgresql://localhost/app')

    allowed_fields = ['username', 'email', 'phone']

    if field not in allowed_fields:
        return {"error": "Invalid field"}

    with engine.connect() as conn:
        query = text(f"SELECT * FROM users WHERE {field} = :value")
        result = conn.execute(query, {"value": value})
        user = result.fetchone()

    return {"user": user}
Enter fullscreen mode Exit fullscreen mode

Snippet 32: Date Range with Regex

import sqlite3
import re

def get_events(start_date, end_date):
    conn = sqlite3.connect('events.db')
    cursor = conn.cursor()

    date_pattern = r'^\d{4}-\d{2}-\d{2}$'

    if not re.match(date_pattern, start_date) or not re.match(date_pattern, end_date):
        return {"error": "Invalid date format. Use YYYY-MM-DD"}

    query = f"SELECT * FROM events WHERE event_date BETWEEN '{start_date}' AND '{end_date}'"
    cursor.execute(query)

    events = cursor.fetchall()
    conn.close()
    return {"events": events}
Enter fullscreen mode Exit fullscreen mode

Snippet 33: Subquery Filter

import psycopg2

def get_top_customers(min_orders):
    conn = psycopg2.connect("dbname=shop")
    cursor = conn.cursor()

    try:
        min_orders = int(min_orders)
    except (ValueError, TypeError):
        return {"error": "Invalid minimum orders value"}

    query = """
        SELECT c.id, c.name, c.email
        FROM customers c
        WHERE (SELECT COUNT(*) FROM orders o WHERE o.customer_id = c.id) >= %s
    """
    cursor.execute(query, (min_orders,))

    customers = cursor.fetchall()
    conn.close()
    return {"customers": customers}
Enter fullscreen mode Exit fullscreen mode

Snippet 34: LIKE with Match Types

import mysql.connector

def search_products(search_term, match_type):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    if match_type == 'starts_with':
        pattern = f"{search_term}%"
    elif match_type == 'ends_with':
        pattern = f"%{search_term}"
    elif match_type == 'contains':
        pattern = f"%{search_term}%"
    else:
        pattern = search_term

    query = "SELECT * FROM products WHERE name LIKE %s"
    cursor.execute(query, (pattern,))

    results = cursor.fetchall()
    conn.close()
    return {"products": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 35: Django ORM

from django.contrib.auth.models import User

def search_users(search_term, order_by):
    allowed_ordering = ['username', 'email', 'date_joined', '-username', '-email', '-date_joined']

    if order_by not in allowed_ordering:
        order_by = 'username'

    users = User.objects.filter(
        username__icontains=search_term
    ).order_by(order_by)

    return list(users.values('id', 'username', 'email'))
Enter fullscreen mode Exit fullscreen mode

Snippet 36: Table Join

import sqlite3

def get_order_details(order_id, include_customer):
    conn = sqlite3.connect('shop.db')
    cursor = conn.cursor()

    if include_customer:
        query = """
            SELECT o.*, c.name, c.email 
            FROM orders o 
            JOIN customers c ON o.customer_id = c.id 
            WHERE o.id = ?
        """
    else:
        query = "SELECT * FROM orders WHERE id = ?"

    cursor.execute(query, (order_id,))

    order = cursor.fetchone()
    conn.close()
    return {"order": order}
Enter fullscreen mode Exit fullscreen mode

Snippet 37: Regex Column Validation

import psycopg2
import re

def get_column_stats(column_name):
    conn = psycopg2.connect("dbname=analytics")
    cursor = conn.cursor()

    if not re.match(r'^[a-z_]+$', column_name):
        return {"error": "Invalid column name"}

    query = f"SELECT MIN({column_name}), MAX({column_name}), AVG({column_name}) FROM metrics"
    cursor.execute(query)

    stats = cursor.fetchone()
    conn.close()
    return {"min": stats[0], "max": stats[1], "avg": stats[2]}
Enter fullscreen mode Exit fullscreen mode

Snippet 38: Cursor Pagination

import psycopg2

def get_paginated_items(cursor_id, limit, direction):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    if not isinstance(cursor_id, int):
        cursor_id = 0

    if not isinstance(limit, int) or limit < 1 or limit > 100:
        limit = 20

    if direction == 'next':
        operator = '>'
        order = 'ASC'
    elif direction == 'prev':
        operator = '<'
        order = 'DESC'
    else:
        return {"error": "Invalid direction"}

    query = f"SELECT * FROM items WHERE id {operator} %s ORDER BY id {order} LIMIT %s"
    cursor.execute(query, (cursor_id, limit))

    items = cursor.fetchall()
    conn.close()
    return {"items": items}
Enter fullscreen mode Exit fullscreen mode

Round 5: Expert Patterns (Snippets 39-52)

Difficulty: Expert

Snippet 39: UUID Validation

import sqlite3
import re

def get_session(session_id):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    uuid_pattern = r'^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$'

    if not re.match(uuid_pattern, session_id):
        return {"error": "Invalid session ID"}

    query = f"SELECT * FROM sessions WHERE id = '{session_id}'"
    cursor.execute(query)

    session = cursor.fetchone()
    conn.close()
    return {"session": session}
Enter fullscreen mode Exit fullscreen mode

Snippet 40: PostgreSQL JSON Operator

import psycopg2

def search_by_tag(tag_name):
    conn = psycopg2.connect("dbname=content")
    cursor = conn.cursor()

    query = f"SELECT * FROM articles WHERE metadata->>'tags' ILIKE '%{tag_name}%'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"articles": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 41: Numeric String Check

import mysql.connector

def get_product(product_id):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    if not str(product_id).isnumeric():
        return {"error": "Invalid product ID"}

    query = f"SELECT * FROM products WHERE id = {product_id}"
    cursor.execute(query)

    product = cursor.fetchone()
    conn.close()
    return {"product": product}
Enter fullscreen mode Exit fullscreen mode

Snippet 42: Window Function

import psycopg2

def get_ranked_sales(partition_col):
    conn = psycopg2.connect("dbname=sales")
    cursor = conn.cursor()

    allowed_partitions = ['region', 'category', 'salesperson', 'quarter']

    if partition_col not in allowed_partitions:
        return {"error": "Invalid partition column"}

    query = f"""
        SELECT *, RANK() OVER (PARTITION BY {partition_col} ORDER BY amount DESC) as rank
        FROM sales
    """
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"sales": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 43: COALESCE Function

import sqlite3

def get_user_display_name(user_id, default_name):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
    cursor.execute(query, (user_id,))

    result = cursor.fetchone()
    conn.close()
    return {"display_name": result[0] if result else None}
Enter fullscreen mode Exit fullscreen mode

Snippet 44: Offset Pagination with Validation

import psycopg2

def get_comments(post_id, page):
    conn = psycopg2.connect("dbname=blog")
    cursor = conn.cursor()

    try:
        page = int(page)
        if page < 1:
            page = 1
    except (ValueError, TypeError):
        page = 1

    offset = (page - 1) * 20

    query = f"SELECT * FROM comments WHERE post_id = %s ORDER BY created_at LIMIT 20 OFFSET {offset}"
    cursor.execute(query, (post_id,))

    comments = cursor.fetchall()
    conn.close()
    return {"comments": comments}
Enter fullscreen mode Exit fullscreen mode

Snippet 45: CAST Function

import mysql.connector

def search_by_year(year_input):
    conn = mysql.connector.connect(host="localhost", database="archive")
    cursor = conn.cursor()

    query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return {"documents": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 46: PostgreSQL ANY Operator

import psycopg2

def find_users_with_role(role):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    allowed_roles = ['admin', 'editor', 'viewer', 'moderator']

    if role not in allowed_roles:
        return {"error": "Invalid role"}

    query = f"SELECT * FROM users WHERE '{role}' = ANY(roles)"
    cursor.execute(query)

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 47: INTERVAL Expression

import psycopg2

def get_recent_activity(hours_ago):
    conn = psycopg2.connect("dbname=activity")
    cursor = conn.cursor()

    if not isinstance(hours_ago, int) or hours_ago < 1 or hours_ago > 168:
        hours_ago = 24

    query = f"SELECT * FROM activity_log WHERE timestamp > NOW() - INTERVAL '{hours_ago} hours'"
    cursor.execute(query)

    activities = cursor.fetchall()
    conn.close()
    return {"activities": activities}
Enter fullscreen mode Exit fullscreen mode

Snippet 48: Simple Parameterization

import sqlite3

def search_full_name(first_name, last_name):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    query = "SELECT * FROM users WHERE first_name = ? AND last_name = ?"
    cursor.execute(query, (first_name, last_name))

    users = cursor.fetchall()
    conn.close()
    return {"users": users}
Enter fullscreen mode Exit fullscreen mode

Snippet 49: Table Function with isidentifier

import psycopg2

def get_table_size(table_name):
    conn = psycopg2.connect("dbname=app")
    cursor = conn.cursor()

    if not table_name.isidentifier():
        return {"error": "Invalid table name"}

    query = f"SELECT pg_size_pretty(pg_total_relation_size('{table_name}'))"
    cursor.execute(query)

    size = cursor.fetchone()
    conn.close()
    return {"size": size[0]}
Enter fullscreen mode Exit fullscreen mode

Snippet 50: Float Validation

import mysql.connector

def get_products_under_price(max_price):
    conn = mysql.connector.connect(host="localhost", database="store")
    cursor = conn.cursor()

    try:
        max_price = float(max_price)
    except (ValueError, TypeError):
        return {"error": "Invalid price"}

    query = f"SELECT * FROM products WHERE price <= {max_price}"
    cursor.execute(query)

    products = cursor.fetchall()
    conn.close()
    return {"products": products}
Enter fullscreen mode Exit fullscreen mode

Snippet 51: EXISTS Subquery

import sqlite3

def get_authors_with_posts(min_posts):
    conn = sqlite3.connect('blog.db')
    cursor = conn.cursor()

    try:
        min_posts = int(min_posts)
    except ValueError:
        return {"error": "Invalid minimum posts"}

    query = """
        SELECT * FROM authors a
        WHERE EXISTS (
            SELECT 1 FROM posts p 
            WHERE p.author_id = a.id 
            GROUP BY p.author_id 
            HAVING COUNT(*) >= ?
        )
    """
    cursor.execute(query, (min_posts,))

    authors = cursor.fetchall()
    conn.close()
    return {"authors": authors}
Enter fullscreen mode Exit fullscreen mode

Snippet 52: Schema-Qualified Table

import psycopg2

def get_data(schema_name, limit):
    conn = psycopg2.connect("dbname=warehouse")
    cursor = conn.cursor()

    allowed_schemas = ['public', 'staging', 'archive']

    if schema_name not in allowed_schemas:
        return {"error": "Invalid schema"}

    if not isinstance(limit, int) or limit < 1:
        limit = 100

    query = f"SELECT * FROM {schema_name}.reports ORDER BY created_at DESC LIMIT {limit}"
    cursor.execute(query)

    data = cursor.fetchall()
    conn.close()
    return {"data": data}
Enter fullscreen mode Exit fullscreen mode

πŸ“ Answer Key

Click to reveal answers

Round 1 Answers

Snippet Vulnerable? Reason
1 βœ… Yes f-string interpolation of username and password
2 ❌ No Properly parameterized with %s
3 βœ… Yes String concatenation of email
4 ❌ No SQLAlchemy :param parameterization
5 βœ… Yes date_filter interpolated via f-string (table is allowlisted)

Snippet 1 Fix

# Vulnerable
query = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
cursor.execute(query)

# Fixed
query = "SELECT * FROM users WHERE username = ? AND password = ?"
cursor.execute(query, (username, password))
Enter fullscreen mode Exit fullscreen mode

Snippet 3 Fix

# Vulnerable
query = "SELECT 1 FROM users WHERE email = '" + email + "' LIMIT 1"
cursor.execute(query)

# Fixed
query = "SELECT 1 FROM users WHERE email = %s LIMIT 1"
cursor.execute(query, (email,))
Enter fullscreen mode Exit fullscreen mode

Snippet 5 Fix

# Vulnerable
query = f"SELECT * FROM {table_name} WHERE created_at &gt; '{date_filter}'"
cursor.execute(query)

# Fixed (table_name already allowlisted, just parameterize date_filter)
query = f"SELECT * FROM {table_name} WHERE created_at &gt; ?"
cursor.execute(query, (date_filter,))
Enter fullscreen mode Exit fullscreen mode

Round 2 Answers

Snippet Vulnerable? Reason
6 ❌ No .isdigit() prevents non-numeric input
7 ❌ No sort_by allowlisted, query_param parameterized
8 ❌ No Placeholders generated safely, values parameterized
9 βœ… Yes remember_token branch uses f-string
10 ❌ No Django parameterization with %s
11 βœ… Yes limit concatenated via str()
12 ❌ No All queries use ? parameterization
13 ❌ No All identifiers allowlisted, values parameterized
14 βœ… Yes reset_password function uses f-string for token

Snippet 9 Fix

# Vulnerable
if remember_token:
    query = f"SELECT * FROM users WHERE remember_token = '{remember_token}'"
    cursor.execute(query)

# Fixed
if remember_token:
    query = "SELECT * FROM users WHERE remember_token = ?"
    cursor.execute(query, (remember_token,))
Enter fullscreen mode Exit fullscreen mode

Snippet 11 Fix

# Vulnerable
query = """
    SELECT timestamp, user_id, action, details 
    FROM audit_logs 
    WHERE action = %s 
    AND timestamp BETWEEN %s AND %s
    ORDER BY timestamp DESC
    LIMIT """ + str(limit)
cursor.execute(query, (action_type, start_date, end_date))

# Fixed
query = """
    SELECT timestamp, user_id, action, details 
    FROM audit_logs 
    WHERE action = %s 
    AND timestamp BETWEEN %s AND %s
    ORDER BY timestamp DESC
    LIMIT %s
"""
cursor.execute(query, (action_type, start_date, end_date, limit))
Enter fullscreen mode Exit fullscreen mode

Snippet 14 Fix (reset_password function)

# Vulnerable
query = f"SELECT id FROM users WHERE reset_token = '{token}' AND reset_expires &gt; datetime('now')"
cursor.execute(query)

# Fixed
query = "SELECT id FROM users WHERE reset_token = ? AND reset_expires &gt; datetime('now')"
cursor.execute(query, (token,))
Enter fullscreen mode Exit fullscreen mode

Round 3 Answers

Snippet Vulnerable? Reason
15 ❌ No column allowlisted, value parameterized
16 βœ… Yes sort_dir not validated
17 ❌ No int() conversion prevents injection
18 ❌ No Columns filtered against allowlist
19 βœ… Yes page and per_page not validated
20 βœ… Yes table_name interpolated without validation
21 ❌ No group_by_col allowlisted, year validated with isinstance(int)
22 ❌ No pref_key allowlisted, values parameterized
23 βœ… Yes log_level concatenated
24 ❌ No Roles filtered against allowlist, then parameterized
25 ❌ No Boolean flags control hardcoded columns
26 ❌ No isinstance(bool) only allows True/False

Snippet 16 Fix

# Vulnerable
query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir}"
cursor.execute(query)

# Fixed (allowlist validation β€” cannot parameterize ASC/DESC)
allowed_directions = ['ASC', 'DESC']
if sort_dir.upper() not in allowed_directions:
    sort_dir = 'ASC'

query = f"SELECT id, username, created_at FROM users ORDER BY created_at {sort_dir.upper()}"
cursor.execute(query)
Enter fullscreen mode Exit fullscreen mode

Snippet 19 Fix

# Vulnerable
offset = (page - 1) * per_page
query = f"SELECT * FROM posts ORDER BY created_at DESC LIMIT {per_page} OFFSET {offset}"
cursor.execute(query)

# Fixed (add validation + parameterization)
try:
    page = int(page)
    per_page = int(per_page)
    if page &lt; 1:
        page = 1
    if per_page &lt; 1 or per_page &gt; 100:
        per_page = 20
except (ValueError, TypeError):
    page, per_page = 1, 20

offset = (page - 1) * per_page
query = "SELECT * FROM posts ORDER BY created_at DESC LIMIT ? OFFSET ?"
cursor.execute(query, (per_page, offset))
Enter fullscreen mode Exit fullscreen mode

Snippet 20 Fix

# Vulnerable
query = f"""
    SELECT column_name, data_type 
    FROM information_schema.columns 
    WHERE table_name = '{table_name}'
"""
cursor.execute(query)

# Fixed
query = """
    SELECT column_name, data_type 
    FROM information_schema.columns 
    WHERE table_name = %s
"""
cursor.execute(query, (table_name,))
Enter fullscreen mode Exit fullscreen mode

Snippet 23 Fix

# Vulnerable
query = """
    SELECT timestamp, level, message 
    FROM logs 
    WHERE timestamp &gt;= %s 
    AND timestamp &lt;= %s
    AND level = '""" + log_level + "' ORDER BY timestamp DESC"
cursor.execute(query, (start_date, end_date))

# Fixed
query = """
    SELECT timestamp, level, message 
    FROM logs 
    WHERE timestamp &gt;= %s 
    AND timestamp &lt;= %s
    AND level = %s
    ORDER BY timestamp DESC
"""
cursor.execute(query, (start_date, end_date, log_level))
Enter fullscreen mode Exit fullscreen mode

Round 4 Answers

Snippet Vulnerable? Reason
27 ❌ No Keys allowlisted, values parameterized
28 βœ… Yes field_name in JSON path not validated
29 ❌ No Status allowlisted, IDs type-checked
30 βœ… Yes label (alias) not validated
31 ❌ No field allowlisted, value parameterized
32 ❌ No Strict regex allows only YYYY-MM-DD
33 ❌ No int() + parameterization
34 ❌ No Pattern built safely, parameterized
35 ❌ No Django ORM + allowlisted ordering
36 ❌ No Boolean selects hardcoded queries
37 ❌ No Strict regex ^[a-z_]+$
38 ❌ No Type checks + hardcoded operators

Snippet 28 Fix

# Vulnerable
query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))

# Fixed (allowlist validation β€” cannot parameterize JSON paths)
allowed_fields = ['author', 'category', 'status', 'priority']
if field_name not in allowed_fields:
    return {"error": "Invalid field name"}

query = f"SELECT * FROM documents WHERE json_extract(metadata, '$.{field_name}') = ?"
cursor.execute(query, (field_value,))
Enter fullscreen mode Exit fullscreen mode

Snippet 30 Fix

# Vulnerable
query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)

# Fixed (allowlist validation β€” cannot parameterize aliases)
allowed_columns = ['revenue', 'expenses', 'profit', 'units_sold']
allowed_labels = ['total', 'amount', 'value', 'metric', 'result']

if value_column not in allowed_columns:
    return {"error": "Invalid column"}
if label not in allowed_labels:
    return {"error": "Invalid label"}

query = f"SELECT date, {value_column} AS {label} FROM monthly_reports ORDER BY date"
cursor.execute(query)
Enter fullscreen mode Exit fullscreen mode

Round 5 Answers

Snippet Vulnerable? Reason
39 ❌ No UUID regex only allows hex + hyphens
40 βœ… Yes tag_name in ILIKE not parameterized
41 ❌ No .isnumeric() prevents non-numeric
42 ❌ No partition_col allowlisted
43 βœ… Yes default_name in COALESCE not parameterized
44 ❌ No int() with error handling
45 βœ… Yes year_input in CAST not parameterized
46 ❌ No role allowlisted
47 ❌ No isinstance(int) + range check
48 ❌ No Proper parameterization
49 ❌ No .isidentifier() allows only valid identifiers
50 ❌ No float() with error handling
51 ❌ No int() + parameterization
52 ❌ No Schema allowlisted, limit type-checked

Snippet 40 Fix

# Vulnerable
query = f"SELECT * FROM articles WHERE metadata-&gt;&gt;'tags' ILIKE '%{tag_name}%'"
cursor.execute(query)

# Fixed (parameterize with wildcards in parameter value)
query = "SELECT * FROM articles WHERE metadata-&gt;&gt;'tags' ILIKE %s"
cursor.execute(query, (f"%{tag_name}%",))
Enter fullscreen mode Exit fullscreen mode

Snippet 43 Fix

# Vulnerable
query = f"SELECT COALESCE(display_name, '{default_name}') FROM users WHERE id = ?"
cursor.execute(query, (user_id,))

# Fixed (parameterize both values)
query = "SELECT COALESCE(display_name, ?) FROM users WHERE id = ?"
cursor.execute(query, (default_name, user_id))
Enter fullscreen mode Exit fullscreen mode

Snippet 45 Fix

# Vulnerable
query = f"SELECT * FROM documents WHERE CAST(created_year AS CHAR) = '{year_input}'"
cursor.execute(query)

# Fixed
query = "SELECT * FROM documents WHERE CAST(created_year AS CHAR) = %s"
cursor.execute(query, (year_input,))
Enter fullscreen mode Exit fullscreen mode

πŸ“Š Score Yourself (Rounds 1-5)

Score Level
0-20 Beginner β€” Review parameterization basics
21-35 Intermediate β€” Practice identifier vs value distinction
36-45 Advanced β€” Focus on subtle patterns
46-52 Expert β€” Ready for Round 6!

Round 6: Advanced Exploitation Patterns (Snippets 53-66)

Difficulty: Expert

Snippet 53: Django Raw SQL Query

from django.contrib.auth.models import User

def get_users_by_role(role_name):
    sql = "SELECT id, username FROM auth_user WHERE first_name = '%s'" % role_name
    users = User.objects.raw(sql)

    return [{"id": u.id, "username": u.username} for u in users]
Enter fullscreen mode Exit fullscreen mode

Snippet 54: Second-Order SQL Injection Storage

import sqlite3

def update_user_preferences(user_id, display_name):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()

    # First query - stores user input
    query1 = "UPDATE users SET display_name = ? WHERE id = ?"
    cursor.execute(query1, (display_name, user_id))
    conn.commit()

    # Second query - retrieves and uses stored data
    query2 = "SELECT display_name FROM users WHERE id = ?"
    cursor.execute(query2, (user_id,))
    name = cursor.fetchone()[0]

    # Third query - uses retrieved data in dynamic SQL
    query3 = f"INSERT INTO audit_log (action, details) VALUES ('name_change', 'New name: {name}')"
    cursor.execute(query3)
    conn.commit()
    conn.close()

    return {"status": "updated"}
Enter fullscreen mode Exit fullscreen mode

Snippet 55: PostgreSQL Array Operations

import psycopg2

def search_by_tags(tag_list: list):
    conn = psycopg2.connect("dbname=blog")
    cursor = conn.cursor()

    # Convert Python list to PostgreSQL array format
    tags_array = '{' + ','.join(tag_list) + '}'

    query = f"SELECT * FROM posts WHERE tags && ARRAY{tags_array}::text[]"
    cursor.execute(query)

    results = cursor.fetchall()
    conn.close()
    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 56: Time-Based Attack Context

import mysql.connector
import time

def check_user_exists_timed(username):
    conn = mysql.connector.connect(host="localhost", database="app")
    cursor = conn.cursor()

    # Validate username format first
    if not username.replace('_', '').replace('-', '').isalnum():
        return {"error": "Invalid username format"}

    query = f"SELECT COUNT(*) FROM users WHERE username = '{username}'"

    start = time.time()
    cursor.execute(query)
    result = cursor.fetchone()[0]
    elapsed = time.time() - start

    conn.close()

    return {"exists": result > 0, "query_time": elapsed}
Enter fullscreen mode Exit fullscreen mode

Snippet 57: Django Database Connection Query

from django.db import connection

def delete_expired_sessions(session_type, days_old):
    sql = f"""DELETE FROM auth_session 
              WHERE session_type = %s 
              AND expire_date < DATE_SUB(NOW(), INTERVAL {days_old} DAY)"""

    with connection.cursor() as cursor:
        cursor.execute(sql, [session_type])
        deleted = cursor.rowcount

    return {"deleted": deleted}
Enter fullscreen mode Exit fullscreen mode

Snippet 58: PostgreSQL JSONB Query

import psycopg2

def search_metadata(field_name, field_value):
    conn = psycopg2.connect("dbname=documents")
    cursor = conn.cursor()

    allowed_fields = ['status', 'priority', 'category', 'author']

    if field_name not in allowed_fields:
        return {"error": "Invalid field"}

    query = f"SELECT * FROM documents WHERE metadata->>'{field_name}' = %s"
    cursor.execute(query, (field_value,))

    results = cursor.fetchall()
    conn.close()
    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 59: Stored Procedure Call

import mysql.connector

def generate_report_via_sp(report_type, start_date, end_date):
    conn = mysql.connector.connect(host="localhost", database="analytics")
    cursor = conn.cursor()

    allowed_types = ['sales', 'inventory', 'users', 'transactions']

    if report_type not in allowed_types:
        return {"error": "Invalid report type"}

    proc_name = f"generate_{report_type}_report"

    # Call stored procedure
    cursor.callproc(proc_name, [start_date, end_date])

    results = []
    for result in cursor.stored_results():
        results.extend(result.fetchall())

    conn.close()
    return {"data": results}
Enter fullscreen mode Exit fullscreen mode

Snippet 60: Error-Based Blind SQLi Context

import sqlite3

def validate_license_key(license_key):
    conn = sqlite3.connect('licenses.db')
    cursor = conn.cursor()

    # Basic format validation
    if len(license_key) != 36 or license_key.count('-') != 4:
        return {"valid": False, "error": "Invalid format"}

    try:
        query = f"SELECT 1 FROM licenses WHERE key = '{license_key}' AND status = 'active'"
        cursor.execute(query)
        result = cursor.fetchone()
        conn.close()

        if result:
            return {"valid": True}
        return {"valid": False, "error": "Invalid or inactive license"}

    except sqlite3.Error as e:
        conn.close()
        return {"valid": False, "error": str(e)}
Enter fullscreen mode Exit fullscreen mode

Snippet 61: Complex Subquery Pattern

import psycopg2

def get_top_customers(metric, limit):
    conn = psycopg2.connect("dbname=sales")
    cursor = conn.cursor()

    allowed_metrics = ['revenue', 'orders', 'items_purchased']

    if metric not in allowed_metrics:
        metric = 'revenue'

    try:
        limit_val = int(limit)
        if limit_val < 1 or limit_val > 100:
            limit_val = 10
    except (ValueError, TypeError):
        limit_val = 10

    query = f"""
        SELECT customer_id, SUM({metric}) as total
        FROM (
            SELECT customer_id, {metric}
            FROM orders
            WHERE status = 'completed'
        ) subquery
        GROUP BY customer_id
        ORDER BY total DESC
        LIMIT ?
    """

    cursor.execute(query, (limit_val,))
    results = cursor.fetchall()
    conn.close()

    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 62: Batch Update with Transaction

import mysql.connector

def batch_update_prices(price_updates: dict):
    """
    price_updates: {"product_123": 29.99, "product_456": 49.99}
    """
    conn = mysql.connector.connect(host="localhost", database="inventory")
    cursor = conn.cursor()

    try:
        conn.start_transaction()

        for product_id, new_price in price_updates.items():
            # Validate product_id format
            if not product_id.startswith('product_'):
                continue

            # Extract numeric ID
            numeric_id = product_id.replace('product_', '')

            if not numeric_id.isdigit():
                continue

            query = f"UPDATE products SET price = %s WHERE id = {numeric_id}"
            cursor.execute(query, (new_price,))

        conn.commit()
        updated = cursor.rowcount

    except Exception as e:
        conn.rollback()
        return {"error": str(e)}
    finally:
        conn.close()

    return {"updated": updated}
Enter fullscreen mode Exit fullscreen mode

Snippet 63: Dynamic Table Selection with CTE

import psycopg2

def get_monthly_summary(table_suffix, year, month):
    conn = psycopg2.connect("dbname=analytics")
    cursor = conn.cursor()

    # Validate year and month
    try:
        year = int(year)
        month = int(month)
        if not (2020 <= year <= 2030) or not (1 <= month <= 12):
            return {"error": "Invalid date range"}
    except (ValueError, TypeError):
        return {"error": "Invalid date format"}

    # Validate table suffix
    allowed_suffixes = ['revenue', 'expenses', 'profit']
    if table_suffix not in allowed_suffixes:
        return {"error": "Invalid table"}

    table_name = f"monthly_{table_suffix}"

    query = f"""
        WITH monthly_data AS (
            SELECT * FROM {table_name}
            WHERE year = %s AND month = %s
        )
        SELECT SUM(amount) as total FROM monthly_data
    """

    cursor.execute(query, (year, month))
    result = cursor.fetchone()
    conn.close()

    return {"total": result[0] if result else 0}
Enter fullscreen mode Exit fullscreen mode

Snippet 64: XML Path Context

import sqlite3

def query_xml_field(doc_id, xpath_expr):
    conn = sqlite3.connect('documents.db')
    cursor = conn.cursor()

    # Validate doc_id is numeric
    if not str(doc_id).isdigit():
        return {"error": "Invalid document ID"}

    # Allowlist common XPath expressions
    allowed_paths = [
        '//title',
        '//author',
        '//date',
        '//content',
        '/document/metadata/tags'
    ]

    if xpath_expr not in allowed_paths:
        return {"error": "Invalid XPath expression"}

    query = f"SELECT xpath('{xpath_expr}', xml_content) FROM documents WHERE id = ?"
    cursor.execute(query, (doc_id,))

    result = cursor.fetchone()
    conn.close()

    return {"data": result[0] if result else None}
Enter fullscreen mode Exit fullscreen mode

Snippet 65: NoSQL-Style Operators in SQL

import psycopg2

def find_users_by_criteria(filter_op, field, value):
    conn = psycopg2.connect("dbname=users")
    cursor = conn.cursor()

    # Map NoSQL-style operators to SQL
    operator_map = {
        '$eq': '=',
        '$ne': '!=',
        '$gt': '>',
        '$lt': '<',
        '$gte': '>=',
        '$lte': '<='
    }

    if filter_op not in operator_map:
        return {"error": "Invalid operator"}

    sql_op = operator_map[filter_op]

    allowed_fields = ['age', 'created_at', 'last_login', 'points']

    if field not in allowed_fields:
        return {"error": "Invalid field"}

    query = f"SELECT id, username FROM users WHERE {field} {sql_op} %s"
    cursor.execute(query, (value,))

    results = cursor.fetchall()
    conn.close()

    return results
Enter fullscreen mode Exit fullscreen mode

Snippet 66: SQLAlchemy Text with Named Parameters

from sqlalchemy import create_engine, text

def advanced_search(filters: dict):
    """
    filters: {
        "status": "active",
        "min_price": 100,
        "sort_by": "price",
        "sort_dir": "DESC"
    }
    """
    engine = create_engine('postgresql://localhost/products')

    allowed_sort_cols = ['price', 'name', 'created_at', 'stock']
    sort_col = filters.get('sort_by', 'name')

    if sort_col not in allowed_sort_cols:
        sort_col = 'name'

    allowed_directions = ['ASC', 'DESC']
    sort_dir = filters.get('sort_dir', 'ASC').upper()

    if sort_dir not in allowed_directions:
        sort_dir = 'ASC'

    base_query = f"""
        SELECT * FROM products 
        WHERE status = :status 
        AND price >= :min_price
        ORDER BY {sort_col} {sort_dir}
    """

    with engine.connect() as conn:
        result = conn.execute(
            text(base_query),
            {
                "status": filters.get("status", "active"),
                "min_price": filters.get("min_price", 0)
            }
        )
        rows = result.fetchall()

    return [dict(row._mapping) for row in rows]
Enter fullscreen mode Exit fullscreen mode

πŸ’‘ Round 6 Answers & Fixes (Click to Expand)

Round 6 Answers

Snippet Vulnerable? Reason
53 βœ… Yes Django raw() with string interpolation
54 βœ… Yes Second-order injection in audit log query
55 βœ… Yes Array literal not parameterized
56 βœ… Yes Time-based context with string concatenation
57 βœ… Yes INTERVAL {days_old} DAY not parameterized
58 ❌ No JSONB key allowlisted, value parameterized
59 βœ… Yes Stored procedure name dynamically constructed
60 βœ… Yes Error messages leak SQL details enabling blind injection
61 ❌ No Metric allowlisted, limit validated and parameterized
62 βœ… Yes numeric_id interpolated into WHERE clause
63 ❌ No Table allowlisted, parameters validated and parameterized
64 ❌ No XPath expression allowlisted, doc_id parameterized
65 ❌ No Operators mapped via allowlist, field allowlisted, value parameterized
66 ❌ No Sort columns/directions allowlisted, values use named parameters

Snippet 53 Fix (Django raw SQL)

# Vulnerable
sql = "SELECT id, username FROM auth_user WHERE first_name = '%s'" % role_name
users = User.objects.raw(sql)

# Fixed (use parameterization with list)
sql = "SELECT id, username FROM auth_user WHERE first_name = %s"
users = User.objects.raw(sql, [role_name])
Enter fullscreen mode Exit fullscreen mode

Source: Full Stack Python Security, pp. 205-207 β€” Django's raw() method requires manual parameterization by passing parameter values as a list. Never use string interpolation (% operator) with raw SQL.1


Snippet 54 Fix (Second-order SQL injection)

# Vulnerable (third query uses unescaped stored data)
query3 = f"INSERT INTO audit_log (action, details) VALUES ('name_change', 'New name: {name}')"
cursor.execute(query3)

# Fixed (parameterize all queries, even with previously stored data)
query3 = "INSERT INTO audit_log (action, details) VALUES ('name_change', ?)"
cursor.execute(query3, (f'New name: {name}',))
Enter fullscreen mode Exit fullscreen mode

Critical Insight: Second-order SQL injection occurs when data stored safely (via parameterization) is later retrieved and used unsafely in a subsequent query. Always parameterize queries even when using data from your own database.2


Snippet 55 Fix (PostgreSQL array operations)

# Vulnerable
tags_array = '{' + ','.join(tag_list) + '}'
query = f"SELECT * FROM posts WHERE tags &amp;&amp; ARRAY{tags_array}::text[]"
cursor.execute(query)

# Fixed (use ANY with parameterized array)
query = "SELECT * FROM posts WHERE tags &amp;&amp; %s"
cursor.execute(query, (tag_list,))

# Alternative: Use psycopg2's array adaptation
from psycopg2.extensions import adapt
query = "SELECT * FROM posts WHERE tags &amp;&amp; %s::text[]"
cursor.execute(query, (tag_list,))
Enter fullscreen mode Exit fullscreen mode

Source: PostgreSQL array operations require special handling. Use psycopg2's built-in array adaptation or the ANY operator with parameterization.3


Snippet 56 Fix (Time-based attack context)

# Vulnerable (even with validation, string concatenation enables time-based attacks)
query = f"SELECT COUNT(*) FROM users WHERE username = '{username}'"
cursor.execute(query)

# Fixed
query = "SELECT COUNT(*) FROM users WHERE username = %s"
cursor.execute(query, (username,))
Enter fullscreen mode Exit fullscreen mode

Attack Vector: An attacker could inject admin' AND SLEEP(5)-- - to determine if user 'admin' exists based on response time. Input validation doesn't prevent time-based blind SQL injection β€” only parameterization does.4


Snippet 57 Fix (Django database connection)

# Vulnerable
sql = f"""DELETE FROM auth_session 
          WHERE session_type = %s 
          AND expire_date &lt; DATE_SUB(NOW(), INTERVAL {days_old} DAY)"""
cursor.execute(sql, [session_type])

# Fixed (parameterize ALL dynamic values)
sql = """DELETE FROM auth_session 
         WHERE session_type = %s 
         AND expire_date &lt; DATE_SUB(NOW(), INTERVAL %s DAY)"""
cursor.execute(sql, [session_type, days_old])
Enter fullscreen mode Exit fullscreen mode

Source: Full Stack Python Security, pp. 206-207 β€” Django's connection.cursor() requires the params argument for parameterization. Never mix f-strings with partial parameterization.1


Snippet 59 Fix (Stored procedure call)

# Vulnerable (procedure name dynamically constructed)
proc_name = f"generate_{report_type}_report"
cursor.callproc(proc_name, [start_date, end_date])

# Fixed (explicit procedure mapping)
procedure_map = {
    'sales': 'generate_sales_report',
    'inventory': 'generate_inventory_report',
    'users': 'generate_users_report',
    'transactions': 'generate_transactions_report'
}

if report_type not in procedure_map:
    return {"error": "Invalid report type"}

proc_name = procedure_map[report_type]
cursor.callproc(proc_name, [start_date, end_date])
Enter fullscreen mode Exit fullscreen mode

Critical: Stored procedure names cannot be parameterized. Use explicit string mapping from allowlist to procedure name. Never construct procedure names dynamically.5


Snippet 60 Fix (Error-based blind injection)

# Vulnerable (returns detailed SQL errors)
except sqlite3.Error as e:
    conn.close()
    return {"valid": False, "error": str(e)}

# Fixed (use generic error messages)
except sqlite3.Error as e:
    conn.close()
    # Log detailed error server-side only
    logger.error(f"License validation error: {e}")
    # Return generic message to client
    return {"valid": False, "error": "License validation failed"}
Enter fullscreen mode Exit fullscreen mode

Critical: Even with parameterized queries, verbose error messages enable blind SQL injection by revealing query structure. Always return generic errors to clients; log details server-side.6


Snippet 62 Fix (Batch update)

# Vulnerable
query = f"UPDATE products SET price = %s WHERE id = {numeric_id}"
cursor.execute(query, (new_price,))

# Fixed (parameterize both values)
query = "UPDATE products SET price = %s WHERE id = %s"
cursor.execute(query, (new_price, numeric_id))
Enter fullscreen mode Exit fullscreen mode

Common Mistake: Developers often validate input (.isdigit()) and assume this makes direct interpolation safe. Validation is insufficient β€” always parameterize. An attacker could manipulate the request to bypass validation logic before it reaches this function.2



πŸŽ“ Advanced Security Lessons from Round 6

Second-Order SQL Injection (Snippet 54)

Real-World Example: The 2014 eBay data breach involved second-order SQL injection where attackers stored malicious payloads in user profiles, which were later executed when administrators viewed audit logs.7

Defense: Treat ALL data as untrusted, even data from your own database. Previous parameterization doesn't make data "safe" for future queries.

Time-Based Blind SQL Injection (Snippet 56)

Real-World Example: In 2017, security researchers demonstrated time-based blind SQL injection in major telecom provider APIs, extracting entire customer databases using SLEEP() functions to map database structure.8

Defense: Parameterization prevents time-based attacks by ensuring user input can never be interpreted as SQL commands, even when validation appears sufficient.

Error-Based Blind SQL Injection (Snippet 60)

Real-World Example: The 2013 Yahoo! data breach began with error-based SQL injection where attackers used verbose SQL error messages to map database schema before exfiltration.9

Defense Strategy:

  1. βœ… Always parameterize queries
  2. βœ… Return generic error messages to clients
  3. βœ… Log detailed errors server-side only
  4. βœ… Implement rate limiting on error responses
  5. βœ… Monitor for repeated error-triggering patterns

Stored Procedures (Snippet 59)

Critical Rule: Stored procedure names are SQL identifiers and cannot be parameterized. Any dynamic construction of procedure names requires strict allowlist validation.

PostgreSQL-Specific Patterns (Snippets 55, 58, 63)

PostgreSQL's advanced features (arrays, JSONB, CTEs) create unique injection vectors:

  • Arrays: Use psycopg2's built-in array adaptation
  • JSONB operators: ->, ->> keys must be allowlisted (cannot parameterize)
  • CTEs: Table names must be allowlisted; only data values can be parameterized

πŸ“š Extended References

Primary Sources for Round 6:

  1. Full Stack Python Security by Dennis Byrne (Manning, 2021)

    • Chapter 13.7: "SQL Injection" (pp. 205-207)
    • Django raw() method parameterization
    • Database connection query safety
    • Error handling best practices
  2. API Security in Action by Neil Madden (Manning, 2020)

    • Chapter 2.4: "Preventing Injection Attacks" (pp. 42-44)
    • Prepared statements vs string escaping
    • Second-order injection prevention
    • Input validation limitations
  3. Hacking APIs by Corey Ball (No Starch Press, 2022)

    • Chapter 12: "Injection" (pp. 254-258)
    • Blind SQL injection techniques
    • Time-based and error-based attacks
    • NoSQL injection patterns
    • SQLmap exploitation methodology
  4. Secure by Design by Johnsson, Deogun, Sawano (Manning, 2019)

    • Chapter 5: "Domain Primitives"
    • Validation vs parameterization trade-offs
    • Secure-by-default API design
  5. PortSwigger Web Security Academy

    • SQL Injection Labs: Blind SQL injection module
    • Time delays and information retrieval
    • Out-of-band techniques
    • Filter bypass via XML encoding

Additional Research:

  1. OWASP: "Blind SQL Injection" (https://owasp.org/www-community/attacks/Blind_SQL_Injection)

    • Time-based attack patterns
    • Error-based exploitation
    • Boolean-based techniques
  2. CWE-89: "Improper Neutralization of Special Elements used in an SQL Command"

    • Common Weakness Enumeration database
    • Real-world vulnerability examples
    • Mitigation strategies
  3. PostgreSQL Documentation: "Arrays" (https://www.postgresql.org/docs/current/arrays.html)

    • Array type handling in queries
    • Safe array parameter binding
  4. Django Documentation: "Performing raw SQL queries" (https://docs.djangoproject.com/en/stable/topics/db/sql/)

    • Official guidance on raw() method
    • connection.cursor() parameterization

🎯 Final Mastery Check

If you scored 62-66 correctly (including fixes), you've mastered:

βœ… Second-order injection β€” Understanding that database-stored data isn't inherently safe

βœ… Blind SQL injection β€” Time-based and error-based attack patterns

βœ… Framework-specific patterns β€” Django raw SQL, PostgreSQL arrays/JSONB

βœ… Stored procedures β€” Identifier parameterization limitations

βœ… Defensive error handling β€” Preventing information leakage

βœ… Advanced SQL features β€” CTEs, subqueries, XML/XPath contexts

You're ready for:

  • 🏒 Senior Security Engineer interviews
  • πŸ” Production code review responsibilities
  • πŸ›‘οΈ SAST/DAST tool customization
  • πŸ“ Security architecture design documents
  • πŸŽ“ OSWE certification exam

πŸš€ What's Next?

Continue Your Training

Week 5-6: Cross-Site Scripting (XSS) Fundamentals

  • Reflected XSS pattern detection
  • Stored XSS vulnerability analysis
  • DOM-based XSS in modern JavaScript
  • Content Security Policy (CSP) implementation

Week 7-8: Authentication & Session Security

  • JWT vulnerability patterns
  • OAuth 2.0 misconfigurations
  • Session fixation attacks
  • CSRF token validation

Week 11-24: System Design for Security Interviews

  • Designing secure authentication systems
  • Rate limiting architectures
  • API gateway security patterns
  • Threat modeling with STRIDE methodology

Join the Community

⭐ Star the repo: github.com/fosres/SecEng-Exercises

What's coming:

  • πŸ” 100+ additional SQL injection exercises with real CVE patterns
  • πŸ›‘οΈ XSS exercise suite (reflected, stored, DOM-based)
  • πŸ”‘ Authentication security challenges (JWT, OAuth, SAML)
  • πŸ“‘ API security exercises (rate limiting, IDOR, BOLA)
  • πŸ§ͺ 100+ test cases per exercise with detailed explanations
  • πŸ“ Companion blog posts for every challenge

Share Your Progress

Did you complete all 66 exercises? Share your score on:

  • 🐦 Twitter: Tag @fosres with #SQLInjectionMastery
  • πŸ’Ό LinkedIn: Share your completion badge
  • πŸ“ Dev.to: Write about what you learned

Building secure software one exercise at a time. Follow @fosres for weekly security challenges.

Top comments (0)