Assessment Focus: Python 3 built-ins · Functional tools · Modern Python 3.13 · Security fundamentals
Level: Beginner → Advanced — structured for systematic reading and practice
Time to complete: ~90 minutes reading + practice
Table of Contents
- Python 3.13 — What Changed and Why It Matters
- Input & Safe Data Entry
- Core Iteration Built-ins — zip, enumerate, map, filter
- Aggregation Built-ins — sum, any, all, min, max
- Sorting — sorted, sort, key functions
- Lambda Functions & Functional Style
- Comprehensions — list, dict, set, generator
- Exception Handling — try, except, else, finally
- Custom Exceptions & Exception Chaining
- Context Managers & Resource Safety
- String Security — Formatting, Sanitisation, Validation
- Injection Attacks — SQL, Command, Path Traversal
- Password & Secret Handling
- Cryptography Fundamentals in Python
- Authentication — JWT, Sessions, Tokens
- Timing Attacks & Side Channels
- Dangerous Python Patterns to Avoid
- Practice Problems with Full Solutions
- Assessment Cheat Sheet
1. Python 3.13 — What Changed and Why It Matters
Python 3.13 (released October 2024) introduced several changes that directly affect how you write safe, idiomatic code. These are the ones most likely to appear in an assessment testing "modern Python" knowledge.
1.1 The New Interactive Interpreter (REPL)
The REPL was completely rewritten using PyPy's pyrepl library:
# Python 3.13 REPL features:
# ✦ Multi-line editing — navigate up/down within a block
# ✦ Syntax highlighting in the terminal
# ✦ Exit with 'exit' (no parentheses needed, though exit() still works)
# ✦ F1 brings up interactive help browser
# ✦ F2 strips output/prompts for clean copying
# ✦ F3 toggles "paste mode" — prevents auto-indentation issues
# How to check your Python version in code:
import sys
print(sys.version) # '3.13.0 (main, Oct 7 2024, ...)'
print(sys.version_info) # sys.version_info(major=3, minor=13, ...)
print(sys.version_info >= (3, 13)) # True
1.2 input() — The Security-Critical Change ⚠️
This is the most important Python 3.13 change for security. In all Python versions prior to 3.13, input() in certain contexts could be manipulated. Python 3.13 reinforces the guarantee that input() always returns a plain string — never evaluates it.
# In Python 2, input() was equivalent to eval(raw_input())
# This was a critical security vulnerability:
# user types: __import__('os').system('rm -rf /')
# Python 2 would EXECUTE it!
# Python 3 (all versions including 3.13):
user_input = input("Enter a value: ")
# Always returns a str, NEVER evaluates
# Even if user types: 2 + 2
# You get the STRING "2 + 2", not the integer 4
# Python 3.13 makes this explicit in the docs and adds stricter
# safeguards in the REPL — input() in all modes is now truly raw
# ✅ Safe pattern for numeric input
def get_integer(prompt: str) -> int | None:
raw = input(prompt) # always a string
try:
return int(raw) # explicit conversion, may raise ValueError
except ValueError:
print(f"'{raw}' is not a valid integer")
return None
# ✅ Safe pattern with validation
def get_positive_number(prompt: str) -> float:
while True:
raw = input(prompt).strip()
try:
value = float(raw)
if value <= 0:
raise ValueError("Must be positive")
return value
except ValueError as e:
print(f"Invalid input: {e}. Try again.")
# ❌ NEVER do this — eval on input is always wrong
result = eval(input("Enter expression: ")) # arbitrary code execution!
1.3 Free-Threaded Mode (Experimental in 3.13)
# Python 3.13 introduces experimental "free-threaded" mode (--disable-gil)
# This allows true parallel threads without the GIL
# Build with: --disable-gil flag, check with:
import sys
print(sys._is_gil_enabled()) # False in free-threaded build, True otherwise
# Security implication: without GIL, concurrent code needs explicit locks
# previously "safe" singleton patterns break without threading.Lock
# Check at runtime so code is forward-compatible:
import threading
_lock = threading.Lock()
def thread_safe_operation():
with _lock:
# critical section — safe in both GIL and free-threaded mode
pass
1.4 Improved Error Messages
Python 3.13 error messages are significantly better. Knowing what they mean speeds up debugging:
# NameError — now suggests did you mean?
pint = 3.14
# print(pitn) → NameError: name 'pitn' is not defined. Did you mean: 'pint'?
# AttributeError — now suggests similar attributes
my_list = [1, 2, 3]
# my_list.appnd(4) → AttributeError: 'list' object has no attribute 'appnd'.
# Did you mean: 'append'?
# TypeError — clearer messages about argument counts
def greet(name, greeting):
return f"{greeting}, {name}!"
# greet("Alice") → TypeError: greet() missing 1 required positional argument: 'greeting'
# Python 3.13 also shows the exact location of the error with a caret (^)
# x = {'a': 1}
# x['b']['c'] → KeyError with helpful traceback pointing to 'b'
1.5 Type Parameter Syntax (PEP 695, stabilised in 3.13)
# Old style (still valid):
from typing import TypeVar
T = TypeVar('T')
def first(lst: list[T]) -> T:
return lst[0]
# New style — Python 3.12+ (3.13 polishes this):
def first[T](lst: list[T]) -> T: # T declared inline
return lst[0]
type Vector = list[float] # type alias statement
type Matrix[T] = list[list[T]] # generic type alias
# These are documentation/tooling hints — still not enforced at runtime
# Use mypy or pyright for static type checking
✏️ Quick Check 1
Q: A junior developer writes result = eval(input("Enter formula: ")). What's the exact security risk?
A: This is Remote Code Execution (RCE). A user can type any Python expression and it will be executed with the full permissions of the running process. For example: __import__('os').system('cat /etc/passwd') or __import__('subprocess').call(['rm','-rf','/']). Even on internal tools, this violates the principle of least privilege and creates an insider threat vector.
2. Input & Safe Data Entry
2.1 The Full input() Contract
# input() always:
# 1. Displays the prompt string to stdout (no newline)
# 2. Reads one line from stdin
# 3. Strips the trailing newline
# 4. Returns a str — NEVER evaluates, NEVER converts
name = input("Name: ") # "Alice\n" → "Alice"
print(type(name)) # <class 'str'>
# Edge cases
empty = input("Press Enter: ") # user just presses Enter → ""
space = input("Enter: ") # user types " " → " " (spaces preserved!)
2.2 Robust Input Validation Patterns
import re
from typing import Callable, TypeVar
T = TypeVar('T')
def validated_input(
prompt: str,
converter: Callable[[str], T],
validator: Callable[[T], bool] | None = None,
error_msg: str = "Invalid input",
max_attempts: int = 3
) -> T:
"""Generic validated input — try up to max_attempts times."""
for attempt in range(1, max_attempts + 1):
raw = input(prompt).strip()
try:
value = converter(raw)
if validator is not None and not validator(value):
raise ValueError(error_msg)
return value
except (ValueError, TypeError) as e:
remaining = max_attempts - attempt
print(f" Error: {e}. {remaining} attempt(s) remaining.")
raise RuntimeError(f"Failed after {max_attempts} attempts")
# Usage examples:
age = validated_input("Age: ", int, lambda x: 0 < x < 150, "Age must be 1–149")
score = validated_input("Score (0–100): ", float, lambda x: 0 <= x <= 100)
email = validated_input(
"Email: ",
str,
lambda s: bool(re.fullmatch(r'[^@\s]+@[^@\s]+\.[^@\s]+', s)),
"Not a valid email address"
)
2.3 Security-First Input Rules
import html
import unicodedata
# Rule 1: Strip and limit length BEFORE processing
def safe_input(prompt: str, max_length: int = 256) -> str:
raw = input(prompt)
stripped = raw.strip()
if len(stripped) > max_length:
raise ValueError(f"Input too long (max {max_length} chars)")
return stripped
# Rule 2: Normalise Unicode to prevent homograph attacks
# "аdmin" (Cyrillic 'а') looks identical to "admin" (Latin 'a')
def normalise_unicode(s: str) -> str:
return unicodedata.normalize('NFKC', s) # normalises lookalike chars
print(normalise_unicode("аdmin") == "admin") # True — detected!
# Rule 3: HTML-escape before rendering in web contexts
user_name = "<script>alert('xss')</script>"
safe_name = html.escape(user_name)
# "<script>alert('xss')</script>"
# Rule 4: Whitelist, don't blacklist
def is_safe_username(s: str) -> bool:
# Allow ONLY alphanumeric + underscore, 3–32 chars
return bool(re.fullmatch(r'[a-zA-Z0-9_]{3,32}', s))
# Blacklist approach is always incomplete — attackers find bypasses
def is_safe_username_bad(s: str) -> bool:
return '<' not in s and '>' not in s # misses %3C, <, etc.
3. Core Iteration Built-ins — zip, enumerate, map, filter
These are the workhorses of Pythonic code. They all return lazy iterators — they don't compute results until you iterate.
3.1 zip — Parallel Iteration
# zip pairs elements from multiple iterables
names = ["Alice", "Bob", "Carol"]
scores = [95, 87, 91]
grades = ["A", "B+", "A-"]
# Basic zip — stops at shortest iterable
for name, score in zip(names, scores):
print(f"{name}: {score}")
# zip returns an iterator — materialise with list() if needed
pairs = list(zip(names, scores)) # [("Alice",95), ("Bob",87), ("Carol",91)]
# zip with * operator — unzip (transpose)
pairs = [(1, 'a'), (2, 'b'), (3, 'c')]
nums, letters = zip(*pairs) # (1,2,3) and ('a','b','c')
# zip_longest — don't stop at shortest (from itertools)
from itertools import zip_longest
a = [1, 2, 3, 4]
b = ['x', 'y']
list(zip_longest(a, b, fillvalue=None)) # [(1,'x'),(2,'y'),(3,None),(4,None)]
# zip three or more iterables
for name, score, grade in zip(names, scores, grades):
print(f"{name}: {score} ({grade})")
# zip for dict construction
keys = ['host', 'port', 'db']
values = ['localhost', 5432, 'mydb']
config = dict(zip(keys, values))
# {'host': 'localhost', 'port': 5432, 'db': 'mydb'}
# Security use: zip for parallel processing with validation
def process_user_data(usernames: list, passwords: list) -> list[dict]:
if len(usernames) != len(passwords):
raise ValueError("Mismatched username/password counts") # security check!
return [
{'username': u.strip(), 'hashed': hash_password(p)}
for u, p in zip(usernames, passwords)
if u.strip() # skip empty usernames
]
3.2 enumerate — Index + Value
fruits = ["apple", "banana", "cherry", "date"]
# Without enumerate — error-prone
for i in range(len(fruits)):
print(i, fruits[i]) # fragile — easy to have off-by-one errors
# With enumerate — clean and idiomatic
for i, fruit in enumerate(fruits):
print(i, fruit) # 0 apple, 1 banana, ...
# Custom start index
for i, fruit in enumerate(fruits, start=1):
print(f"{i}. {fruit}") # 1. apple, 2. banana, ...
# enumerate returns (index, value) tuples — useful for list of dicts
records = [{'name': 'Alice', 'score': 95}, {'name': 'Bob', 'score': 87}]
for idx, record in enumerate(records):
print(f"Record {idx}: {record['name']} scored {record['score']}")
# Security use: enumerate for audit logging with position context
def validate_config_lines(lines: list[str]) -> list[str]:
errors = []
for line_num, line in enumerate(lines, start=1):
line = line.strip()
if not line or line.startswith('#'):
continue # skip empty lines and comments
if '=' not in line:
errors.append(f"Line {line_num}: missing '=' in '{line}'")
key, _, value = line.partition('=')
if not key.strip():
errors.append(f"Line {line_num}: empty key")
if not value.strip():
errors.append(f"Line {line_num}: empty value for key '{key.strip()}'")
return errors
3.3 map — Apply a Function to Every Element
# map(function, iterable) → lazy iterator
numbers = [1, 4, 9, 16, 25]
roots = list(map(lambda x: x ** 0.5, numbers)) # [1.0, 2.0, 3.0, 4.0, 5.0]
# map with a named function
words = [" hello ", " world ", " python "]
stripped = list(map(str.strip, words)) # ['hello', 'world', 'python']
uppercased = list(map(str.upper, words))
# map with multiple iterables — zip-like, stops at shortest
a = [1, 2, 3]
b = [10, 20, 30]
sums = list(map(lambda x, y: x + y, a, b)) # [11, 22, 33]
# map vs list comprehension — map is faster for simple named functions,
# comprehensions are faster for inline expressions and more readable
# Generally prefer comprehensions for clarity:
stripped_lc = [w.strip() for w in words] # preferred
# Security use: sanitise a batch of user inputs
raw_inputs = [" Alice ", "<Bob>", "Carol\n", " dave@example.com "]
cleaned = list(map(lambda s: html.escape(s.strip()), raw_inputs))
3.4 filter — Keep Elements Matching a Predicate
import html
numbers = [-3, -1, 0, 1, 2, 3, 4, 5]
# filter(predicate, iterable) — keeps elements where predicate is True
positives = list(filter(lambda x: x > 0, numbers)) # [1,2,3,4,5]
non_zero = list(filter(None, numbers)) # [nonzero elements] — filter(None,...) removes falsy
# Equivalent comprehension (preferred for clarity)
positives_lc = [x for x in numbers if x > 0]
# filter with a named function
words = ["hello", "", "world", " ", "python", None, "!"]
# Keep only truthy non-empty strings
valid = list(filter(lambda w: w and w.strip(), words)) # ['hello','world','python','!']
# Chaining filter + map — pipeline style
data = [" admin ", "", " Bob ", " ", "charlie"]
cleaned = list(map(str.upper, filter(lambda s: s.strip(), data)))
# ['ADMIN', 'BOB', 'CHARLIE']
# Security use: filter out suspicious inputs before processing
ALLOWED_PATTERN = re.compile(r'^[a-zA-Z0-9_@.\-]+$')
def filter_safe_inputs(inputs: list[str]) -> list[str]:
def is_safe(s: str) -> bool:
s = s.strip()
return bool(s) and bool(ALLOWED_PATTERN.match(s))
return list(filter(is_safe, inputs))
suspicious = ["alice", "bob; DROP TABLE users;--", "carol", "../../etc/passwd"]
safe_only = filter_safe_inputs(suspicious) # ['alice', 'carol']
✏️ Practice Problem 3
Problem: You have a list of log entries as strings:
"2024-01-15 ERROR user=admin ip=192.168.1.1 msg=login_failed"
"2024-01-15 INFO user=alice ip=10.0.0.5 msg=login_ok"
"2024-01-15 ERROR user=root ip=203.0.113.5 msg=login_failed"
Using filter, map, and enumerate, extract all ERROR entries, parse each into a dict, and return a list of (line_number, parsed_dict) tuples.
Solution:
logs = [
"2024-01-15 ERROR user=admin ip=192.168.1.1 msg=login_failed",
"2024-01-15 INFO user=alice ip=10.0.0.5 msg=login_ok",
"2024-01-15 ERROR user=root ip=203.0.113.5 msg=login_failed",
"2024-01-15 WARN user=bob ip=172.16.0.1 msg=rate_limited",
]
def parse_log_line(line: str) -> dict:
parts = line.split()
date = parts[0]
level = parts[1].strip()
fields = {}
for part in parts[2:]:
if '=' in part:
k, _, v = part.partition('=')
fields[k] = v
return {'date': date, 'level': level, **fields}
# Step 1: enumerate gives us (line_number, line)
# Step 2: filter keeps only ERROR lines
# Step 3: map parses each into a dict
numbered = list(enumerate(logs, start=1)) # (linenum, line) tuples
error_entries = list(filter(
lambda pair: 'ERROR' in pair[1],
numbered
)) # [(1, "...ERROR..."), (3, "...ERROR...")]
parsed_errors = list(map(
lambda pair: (pair[0], parse_log_line(pair[1])),
error_entries
))
for line_num, entry in parsed_errors:
print(f"Line {line_num}: {entry}")
# Line 1: {'date': '2024-01-15', 'level': 'ERROR', 'user': 'admin', 'ip': '192.168.1.1', 'msg': 'login_failed'}
# Line 3: {'date': '2024-01-15', 'level': 'ERROR', 'user': 'root', 'ip': '203.0.113.5', 'msg': 'login_failed'}
4. Aggregation Built-ins — sum, any, all, min, max
4.1 sum — Flexible Accumulation
# Basic sum — works on any numeric iterable
numbers = [1, 2, 3, 4, 5]
total = sum(numbers) # 15
# start parameter — initial value (crucial for non-zero starts)
sum(numbers, start=100) # 115
sum(numbers, 0) # 15 (positional, but named is clearer)
# Summing non-integers — as long as + is defined
from decimal import Decimal
prices = [Decimal('9.99'), Decimal('4.99'), Decimal('14.99')]
total = sum(prices, Decimal('0')) # Decimal('29.97')
# Sum of a generator expression — memory efficient
total_sq = sum(x**2 for x in range(1000)) # no intermediate list!
# Flatten one level with sum + start=[] (works but is slow — use itertools.chain)
nested = [[1,2], [3,4], [5,6]]
flat = sum(nested, start=[]) # [1,2,3,4,5,6] — but O(n²)! avoid for large lists
# Better:
import itertools
flat = list(itertools.chain.from_iterable(nested)) # O(n)
# Security use: count failed attempts from events list
events = [
{'type': 'login_fail', 'user': 'admin'},
{'type': 'login_ok', 'user': 'alice'},
{'type': 'login_fail', 'user': 'root'},
{'type': 'login_fail', 'user': 'root'},
]
fail_count = sum(1 for e in events if e['type'] == 'login_fail') # 3
# Or: sum(e['type'] == 'login_fail' for e in events) — bools are ints (True=1, False=0)
4.2 any and all — Logical Reductions
# any(iterable) — True if ANY element is truthy (short-circuits on first True)
# all(iterable) — True if ALL elements are truthy (short-circuits on first False)
flags = [True, False, True, True]
print(any(flags)) # True — at least one True
print(all(flags)) # False — not ALL are True
# Edge cases — always know these
any([]) # False — no elements to be True
all([]) # True — vacuously true (no element is False)
# With generator expressions — lazy, short-circuits
numbers = range(1, 1_000_000)
has_even = any(n % 2 == 0 for n in numbers) # stops at n=2
all_pos = all(n > 0 for n in numbers) # stops at first n ≤ 0 (never, so scans all)
# Security uses — validation pipelines
def validate_password(password: str) -> tuple[bool, list[str]]:
checks = [
(len(password) >= 12, "at least 12 characters"),
(any(c.isupper() for c in password), "at least one uppercase letter"),
(any(c.islower() for c in password), "at least one lowercase letter"),
(any(c.isdigit() for c in password), "at least one digit"),
(any(c in '!@#$%^&*()_+-=' for c in password), "at least one special character"),
]
failures = [msg for passed, msg in checks if not passed]
return (len(failures) == 0, failures)
ok, errors = validate_password("MyPass1!")
print(ok, errors) # False, ['at least 12 characters']
ok, errors = validate_password("MySecurePass1!")
print(ok, errors) # True, []
# Check multiple permissions at once
def can_access(user: dict, resource: str) -> bool:
required_permissions = ['authenticated', 'active', f'can_read_{resource}']
return all(user.get(p, False) for p in required_permissions)
# Validate a list of IP addresses
import ipaddress
def all_ips_valid(ip_list: list[str]) -> bool:
def is_valid_ip(ip: str) -> bool:
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
return all(is_valid_ip(ip) for ip in ip_list)
4.3 min and max — With Custom Keys
students = [
{'name': 'Alice', 'score': 95, 'age': 20},
{'name': 'Bob', 'score': 87, 'age': 22},
{'name': 'Carol', 'score': 91, 'age': 21},
]
# Default min/max on numbers
print(min(3, 1, 4, 1, 5, 9)) # 1
print(max([3, 1, 4, 1, 5, 9])) # 9
# With key function — transforms each element before comparison
top_scorer = max(students, key=lambda s: s['score']) # Alice (95)
youngest = min(students, key=lambda s: s['age']) # Alice (20)
name_first = min(students, key=lambda s: s['name']) # Alice (alphabetical)
# default parameter — avoids ValueError on empty iterable
winner = max([], default=None) # None instead of ValueError
# Security use: find the most suspicious IP (most failed attempts)
login_attempts = [
{'ip': '192.168.1.1', 'failures': 3},
{'ip': '203.0.113.5', 'failures': 47},
{'ip': '10.0.0.5', 'failures': 1},
]
most_suspicious = max(login_attempts, key=lambda x: x['failures'])
print(most_suspicious['ip']) # '203.0.113.5'
5. Sorting — sorted, sort, key functions
5.1 sorted vs .sort()
# sorted() — returns a NEW sorted list, works on any iterable
nums = [3, 1, 4, 1, 5, 9, 2, 6]
asc = sorted(nums) # [1,1,2,3,4,5,6,9] — nums unchanged
desc = sorted(nums, reverse=True) # [9,6,5,4,3,2,1,1]
# .sort() — in-place, returns None, only on lists
nums.sort() # modifies nums in place
nums.sort(reverse=True) # in-place descending
# GOTCHA: sorted() returns a list even from non-list iterables
sorted_str = sorted("python") # ['h','n','o','p','t','y'] — sorted chars
sorted_set = sorted({3,1,2}) # [1,2,3]
sorted_dict = sorted({'b':2,'a':1}) # ['a','b'] — sorts KEYS only
# Stability — Python's sort is stable (TimSort)
# Elements with equal keys maintain their original relative order
data = [('Bob', 90), ('Alice', 90), ('Carol', 85)]
sorted(data, key=lambda x: x[1])
# [('Carol',85), ('Bob',90), ('Alice',90)] — Bob before Alice preserved!
5.2 The key Parameter — Transformation Before Comparison
# key= receives a function applied to each element ONCE before comparison
# The key values are compared, not the original elements
# Case-insensitive sort
words = ["Banana", "apple", "Cherry", "date"]
sorted(words, key=str.lower) # ['apple', 'Banana', 'Cherry', 'date']
# Sort by string length, then alphabetically (multi-key sort)
sorted(words, key=lambda w: (len(w), w.lower()))
# ['date', 'apple', 'Banana', 'Cherry']
# Sort dicts by a field
users = [
{'name': 'Carol', 'last_login': '2024-01-10', 'role': 'admin'},
{'name': 'Alice', 'last_login': '2024-01-15', 'role': 'user'},
{'name': 'Bob', 'last_login': '2024-01-12', 'role': 'user'},
]
by_login = sorted(users, key=lambda u: u['last_login'], reverse=True)
# Most recent login first: Alice → Bob → Carol
# Sort by role then name (admin first, then alphabetical)
by_role = sorted(users, key=lambda u: (u['role'] != 'admin', u['name']))
# Carol (admin) first, then Alice, Bob
# operator.itemgetter — faster than lambda for dict/tuple access
from operator import itemgetter, attrgetter
by_name = sorted(users, key=itemgetter('name')) # faster than lambda u: u['name']
# attrgetter for objects
from dataclasses import dataclass
@dataclass
class User:
name: str
score: int
active: bool
user_list = [User('Bob', 70, True), User('Alice', 90, False), User('Carol', 80, True)]
active_by_score = sorted(
filter(attrgetter('active'), user_list), # filter active users
key=attrgetter('score'),
reverse=True
) # [Carol(80), Bob(70)]
5.3 Security Relevance of Sorting
# Sorting for security reporting
def top_n_attackers(events: list[dict], n: int = 10) -> list[dict]:
"""Return top N IPs by number of failed attempts."""
from collections import Counter
failure_counts = Counter(
e['ip'] for e in events if e.get('event') == 'login_failure'
)
# Sort by count descending, then IP for determinism
return sorted(
[{'ip': ip, 'failures': count} for ip, count in failure_counts.items()],
key=lambda x: (-x['failures'], x['ip'])
)[:n]
# Priority queue for processing security alerts
alerts = [
{'severity': 3, 'msg': 'Port scan detected'},
{'severity': 1, 'msg': 'Failed login'},
{'severity': 5, 'msg': 'Root shell spawned'},
{'severity': 2, 'msg': 'Unusual outbound traffic'},
]
priority_order = sorted(alerts, key=lambda a: -a['severity'])
# Severity 5 first — critical alerts processed first
6. Lambda Functions & Functional Style
6.1 Lambda Syntax and Scope
# lambda args: expression
# Single expression only — no statements, no assignments, no return keyword
double = lambda x: x * 2 # ← function object
add = lambda x, y: x + y
identity = lambda x: x
# Default arguments
greet = lambda name, greeting="Hello": f"{greeting}, {name}!"
# Immediately Invoked Lambda (unusual but valid)
result = (lambda x, y: x ** y)(2, 10) # 1024
# Lambdas are closures — they capture surrounding variables by reference!
multipliers = [lambda x, n=n: x * n for n in range(5)]
# Note: n=n captures the current value (default arg trick)
# Without n=n: all lambdas would capture n=4 (the final loop value)
# ❌ Common closure bug:
bad_fns = [lambda x: x * n for n in range(5)]
bad_fns[0](1) # 4! Not 0 — n=4 (final loop value) for all of them
# ✅ Capture loop variable correctly:
good_fns = [lambda x, n=n: x * n for n in range(5)]
good_fns[0](1) # 0
good_fns[3](1) # 3
6.2 Lambda with Built-ins
data = [{'id': 3, 'val': 9}, {'id': 1, 'val': 25}, {'id': 2, 'val': 4}]
# sorted with lambda key
sorted(data, key=lambda d: d['val']) # by val ascending
# max/min with lambda key
max(data, key=lambda d: d['id']) # id=3
# map with lambda
list(map(lambda d: d['val'] ** 0.5, data)) # [3.0, 5.0, 2.0]
# filter with lambda
list(filter(lambda d: d['val'] > 5, data)) # [{'id':3,'val':9}, {'id':1,'val':25}]
# Chained functional pipeline:
result = list(
map(
lambda d: {**d, 'sqrt_val': round(d['val'] ** 0.5, 2)},
filter(lambda d: d['val'] > 0, data)
)
)
6.3 When NOT to Use Lambda
# ❌ Too complex — unreadable
process = lambda x: (x.strip().lower() if isinstance(x, str) else str(x)).replace(' ', '_')
# ✅ Use a named function — self-documenting
def normalise_field(x) -> str:
"""Convert to lowercase string with spaces as underscores."""
if not isinstance(x, str):
x = str(x)
return x.strip().lower().replace(' ', '_')
# ❌ Lambda assigned to a name (PEP 8 violation — use def instead)
square = lambda x: x**2 # linters will flag this
# ✅ Named function
def square(x): return x ** 2
# Lambda is best as an inline anonymous key/transform passed to another function
# Not as a named reusable function
7. Comprehensions — list, dict, set, generator
7.1 List Comprehensions
# [expression for item in iterable if condition]
squares = [x**2 for x in range(10)]
even_squares = [x**2 for x in range(10) if x % 2 == 0]
# Nested comprehension — matrix operations
matrix = [[1,2,3],[4,5,6],[7,8,9]]
flat = [cell for row in matrix for cell in row] # [1,2,3,4,5,6,7,8,9]
transposed = [[row[i] for row in matrix] for i in range(3)]
# Walrus operator (:=) in comprehension (Python 3.8+)
# Compute and keep only non-None results without calling the function twice
import re
lines = ["error: foo", "warning: bar", "info: baz", "error: qux"]
error_msgs = [
m.group(1)
for line in lines
if (m := re.match(r'error: (.+)', line))
]
# ['foo', 'qux']
7.2 Dict and Set Comprehensions
# Dict comprehension: {key_expr: value_expr for item in iterable if condition}
word = "mississippi"
freq = {char: word.count(char) for char in set(word)}
# {'m':1, 'i':4, 's':4, 'p':2}
# Invert a dict (assumes unique values)
original = {'a': 1, 'b': 2, 'c': 3}
inverted = {v: k for k, v in original.items()} # {1:'a', 2:'b', 3:'c'}
# Filter a dict
scores = {'Alice': 95, 'Bob': 60, 'Carol': 82, 'Dave': 45}
passing = {name: score for name, score in scores.items() if score >= 70}
# {'Alice': 95, 'Carol': 82}
# Normalize dict keys (security: prevent key confusion attacks)
raw_config = {' Host ': 'localhost', 'PORT': '5432', 'Db_Name': 'mydb'}
clean_config = {k.strip().lower(): v.strip() for k, v in raw_config.items()}
# {'host': 'localhost', 'port': '5432', 'db_name': 'mydb'}
# Set comprehension
unique_domains = {email.split('@')[1].lower() for email in [
'Alice@Example.com', 'bob@gmail.com', 'carol@EXAMPLE.COM'
]}
# {'example.com', 'gmail.com'}
7.3 Generator Expressions — Lazy Evaluation
# Generator expressions compute values ONE AT A TIME on demand
# Critical for large datasets — use O(1) memory regardless of size
# Materialise with list() or iterate directly
gen = (x**2 for x in range(10)) # NO computation yet
next(gen) # 0 — computes first value
next(gen) # 1 — computes second value
# Pass directly to functions that accept iterables
total = sum(x**2 for x in range(1_000_000)) # O(1) memory!
maximum = max(len(line) for line in open('file.txt')) # line by line
# Generator with filter
safe_inputs = (
s.strip()
for s in raw_inputs
if s.strip() and len(s.strip()) <= 256
)
# When to use list vs generator:
# - list comprehension: when you need the result multiple times, or need len()
# - generator expression: when you only iterate once, especially for large data
8. Exception Handling — try, except, else, finally
This is one of the most important Python topics for writing robust, secure code. Sloppy exception handling is a major source of security vulnerabilities.
8.1 The Full try/except/else/finally Anatomy
def read_config(path: str) -> dict:
try:
# ── Code that might raise exceptions ──
with open(path, 'r') as f:
content = f.read()
config = parse_config(content)
return config
except FileNotFoundError:
# ── Specific exception — handle gracefully ──
print(f"Config file not found: {path}")
return {}
except PermissionError:
# ── Different specific exception ──
raise RuntimeError(f"Cannot read config: permission denied") from None
except (ValueError, KeyError) as e:
# ── Multiple exceptions in one handler ──
raise ValueError(f"Malformed config in {path}: {e}") from e
except Exception as e:
# ── Catch-all for truly unexpected errors ──
# Log it, then re-raise — don't silently swallow
import logging
logging.exception(f"Unexpected error reading {path}")
raise # re-raise the SAME exception with traceback preserved
else:
# ── Runs ONLY if no exception was raised in try ──
# Perfect for code that should only run on success
# NOT protected by the excepts above it
log_successful_config_load(path)
finally:
# ── ALWAYS runs — exception or not ──
# Use for guaranteed cleanup
cleanup_temp_resources()
8.2 Exception Hierarchy — Know What You're Catching
# Built-in exception hierarchy (abbreviated):
# BaseException
# ├── SystemExit ← sys.exit()
# ├── KeyboardInterrupt ← Ctrl+C
# ├── GeneratorExit ← generator .close()
# └── Exception ← Almost everything else
# ├── StopIteration ← end of iteration
# ├── ArithmeticError
# │ ├── ZeroDivisionError
# │ └── OverflowError
# ├── LookupError
# │ ├── IndexError ← list[999]
# │ └── KeyError ← dict['missing']
# ├── TypeError ← wrong type for operation
# ├── ValueError ← right type, wrong value
# ├── OSError (IOError, EnvironmentError)
# │ ├── FileNotFoundError
# │ ├── PermissionError
# │ └── ConnectionError
# ├── RuntimeError
# └── AttributeError
# ❌ WRONG — catching BaseException catches SystemExit and Ctrl+C!
try:
risky()
except BaseException:
pass # Ctrl+C now broken — user can't interrupt!
# ❌ WRONG — bare except has same problem
try:
risky()
except:
pass
# ✅ RIGHT — catch Exception if you want "everything except system signals"
try:
risky()
except Exception as e:
handle(e)
# ✅ BEST — catch SPECIFIC exceptions
try:
value = int(user_input)
except ValueError:
print("Not a valid integer")
8.3 Exception Patterns for Secure Code
# Pattern 1: Fail-safe defaults
def get_user_role(user_id: int, db) -> str:
try:
user = db.query(user_id)
return user.role
except (DatabaseError, KeyError, AttributeError):
# On ANY error, return most restrictive role
return 'anonymous' # fail-safe: deny access rather than grant it
# Pattern 2: Specific exception for each failure mode
def authenticate(username: str, password: str) -> dict:
try:
user = find_user(username) # may raise UserNotFoundError
verify_password(password, user.hash) # may raise InvalidCredentials
if not user.active:
raise AccountDisabledError(f"Account {username} is disabled")
return generate_token(user)
except (UserNotFoundError, InvalidCredentials):
# SAME error message for both — prevents username enumeration!
raise AuthenticationError("Invalid username or password") from None
except AccountDisabledError:
raise # re-raise as-is for specific handling by caller
# Pattern 3: Never swallow exceptions silently
def bad_auth(username, password):
try:
return do_auth(username, password)
except:
return None # ← SILENT FAILURE: bug? attack? we'll never know
def good_auth(username, password):
try:
return do_auth(username, password)
except AuthError:
raise # propagate to caller
except Exception as e:
logging.error(f"Unexpected auth error: {type(e).__name__}: {e}")
raise RuntimeError("Authentication service unavailable") from e
# Pattern 4: Context-safe exception handling
import contextlib
# Suppress a specific exception (use sparingly)
with contextlib.suppress(FileNotFoundError):
os.remove(temp_file) # OK if file doesn't exist — we wanted it gone anyway
8.4 The else Clause — Often Forgotten
# The else clause runs only when try completes WITHOUT raising
# This is subtly different from putting code at the end of try
# Without else — misleading: the except catches setup AND parse errors
try:
data = fetch_data()
result = parse(data) # if this raises, it looks like a fetch error
except NetworkError as e:
handle_network_error(e)
# With else — clear separation of concerns
try:
data = fetch_data() # only network errors caught here
except NetworkError as e:
handle_network_error(e)
else:
result = parse(data) # parse errors propagate naturally (not caught)
process(result)
9. Custom Exceptions & Exception Chaining
9.1 Designing a Custom Exception Hierarchy
# Base exception for your application — all app errors inherit from this
class AppError(Exception):
"""Base exception for all application errors."""
def __init__(self, message: str, code: str | None = None):
super().__init__(message)
self.code = code
self.message = message
def __str__(self):
if self.code:
return f"[{self.code}] {self.message}"
return self.message
# Specific error categories
class ValidationError(AppError):
"""Raised when user input fails validation."""
pass
class AuthenticationError(AppError):
"""Raised when authentication fails."""
def __init__(self, message: str = "Invalid credentials"):
super().__init__(message, code="AUTH_FAILED")
class AuthorizationError(AppError):
"""Raised when authenticated user lacks permission."""
def __init__(self, resource: str, action: str):
super().__init__(
f"Permission denied: cannot {action} on {resource}",
code="AUTHZ_DENIED"
)
self.resource = resource
self.action = action
class DatabaseError(AppError):
"""Raised when database operations fail."""
pass
class RateLimitError(AppError):
"""Raised when rate limit is exceeded."""
def __init__(self, retry_after: int = 60):
super().__init__(
f"Rate limit exceeded. Try again in {retry_after}s.",
code="RATE_LIMITED"
)
self.retry_after = retry_after
# Usage
def get_user(user_id: int) -> dict:
if not isinstance(user_id, int) or user_id <= 0:
raise ValidationError(f"Invalid user_id: {user_id!r}", code="INVALID_ID")
user = db.find(user_id)
if user is None:
raise AuthenticationError()
if not user['active']:
raise AuthorizationError(resource=f"user/{user_id}", action="access")
return user
9.2 Exception Chaining — raise ... from ...
# Python 3: exceptions chain automatically (implicit chaining)
try:
int("abc")
except ValueError:
raise RuntimeError("Config parsing failed")
# RuntimeError shows "During handling of the above exception, another exception occurred"
# The original ValueError is accessible as __context__
# Explicit chaining: raise X from Y — use when you want to document the cause
def load_config(path: str) -> dict:
try:
with open(path) as f:
return json.load(f)
except FileNotFoundError as e:
raise AppError(f"Config not found: {path}") from e # e is the __cause__
except json.JSONDecodeError as e:
raise ValidationError(f"Malformed JSON in {path}: {e}") from e
# Suppress chaining: raise X from None — hides the original
# Use when the original exception contains sensitive information
def authenticate(username: str, password: str):
try:
user = db.query("SELECT * FROM users WHERE username=?", (username,))
if user is None:
raise AuthenticationError()
except DatabaseError as e:
# The DatabaseError might reveal internal schema details
# raise AuthenticationError() from e ← LEAKS db info in traceback
raise AuthenticationError() from None # clean — hides db error
# Accessing chained exceptions
try:
load_config("missing.json")
except AppError as e:
print(e) # [None] Config not found: missing.json
print(e.__cause__) # [Errno 2] No such file or directory: 'missing.json'
9.3 Exception Groups (Python 3.11+)
# ExceptionGroup lets you raise/handle multiple exceptions at once
# Useful for parallel operations that can fail independently
import asyncio
# Raising an ExceptionGroup
def validate_form(data: dict) -> None:
errors = []
if not data.get('email'):
errors.append(ValueError("Email is required"))
if not data.get('username') or len(data['username']) < 3:
errors.append(ValueError("Username must be at least 3 characters"))
if errors:
raise ExceptionGroup("Form validation failed", errors)
# Handling with except* (Python 3.11+)
try:
validate_form({'username': 'ab'})
except* ValueError as eg:
for exc in eg.exceptions:
print(f"Validation error: {exc}")
10. Context Managers & Resource Safety
10.1 The with Statement Contract
# The context manager protocol: __enter__ and __exit__
# __enter__: called at start, return value is bound to 'as' variable
# __exit__: ALWAYS called — even on exception, return, or Ctrl+C
# File handling — the canonical example
with open('secret.txt', 'r') as f:
data = f.read()
# f.close() is GUARANTEED even if read() raises an exception
# Multiple context managers (Python 3.10+ — parenthesised form)
with (
open('input.txt', 'r') as src,
open('output.txt', 'w') as dst,
database.transaction() as tx
):
dst.write(src.read())
tx.commit()
# Both are equivalent to nested withs:
with open('input.txt', 'r') as src:
with open('output.txt', 'w') as dst:
dst.write(src.read())
10.2 contextlib — Building Context Managers
import contextlib
import threading
# @contextmanager — simplest way to build a context manager
@contextlib.contextmanager
def managed_temp_file(suffix: str = '.tmp') -> str:
import tempfile, os
path = tempfile.mktemp(suffix=suffix)
try:
yield path # everything BEFORE yield is __enter__
finally:
# everything in finally is __exit__ — ALWAYS runs
if os.path.exists(path):
os.remove(path)
print(f"Cleaned up {path}")
with managed_temp_file('.json') as path:
with open(path, 'w') as f:
json.dump({'secret': 'data'}, f)
process(path)
# file deleted automatically even if process() raises
# Context manager for timing
@contextlib.contextmanager
def timer(label: str):
import time
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.4f}s")
with timer("Database query"):
results = db.query(sql)
# Context manager for acquiring/releasing locks
@contextlib.contextmanager
def locked(lock: threading.Lock, timeout: float = 5.0):
acquired = lock.acquire(timeout=timeout)
if not acquired:
raise RuntimeError("Could not acquire lock within timeout")
try:
yield
finally:
lock.release()
10.3 Security-Critical Resource Patterns
import contextlib, secrets, os
# Pattern: ensure sensitive data is cleared from memory
@contextlib.contextmanager
def sensitive_buffer():
"""Clears sensitive bytes from memory on exit."""
data = bytearray(256) # mutable — we can zero it out
try:
yield data
finally:
# Zero out the buffer (prevents cold-boot/memory inspection attacks)
for i in range(len(data)):
data[i] = 0
with sensitive_buffer() as buf:
# read password into buf
pass
# buf is now all zeros
# Pattern: atomic file write (prevent partial writes on crash)
@contextlib.contextmanager
def atomic_write(path: str, mode: str = 'w'):
"""Write to a temp file, then atomically rename to target."""
tmp_path = path + f'.{secrets.token_hex(4)}.tmp'
try:
with open(tmp_path, mode) as f:
yield f
os.replace(tmp_path, path) # atomic on POSIX systems
except Exception:
with contextlib.suppress(FileNotFoundError):
os.remove(tmp_path)
raise
with atomic_write('/etc/app/config.json') as f:
json.dump(new_config, f, indent=2)
# Readers always see a complete file — never a partial write
11. String Security — Formatting, Sanitisation, Validation
11.1 f-strings and Format Security
# f-strings (Python 3.6+) — fast, readable, but NOT safe for untrusted templates
name = "Alice"
score = 95.678
status = True
# Basic f-string
print(f"Hello, {name}!")
# Format specifiers
print(f"Score: {score:.2f}") # 95.68 — 2 decimal places
print(f"Score: {score:08.2f}") # 00095.68 — padded
print(f"Hex: {255:#010x}") # 0x000000ff
print(f"Big number: {1_000_000:,}") # 1,000,000
# Expressions inside f-strings
print(f"{'PASS' if status else 'FAIL'}")
print(f"{name.upper()!r}") # 'ALICE' — !r for repr(), !s for str(), !a for ascii()
# Self-documenting (Python 3.8+) — variable name + value
x = 42
print(f"{x = }") # x = 42 (includes variable name!)
# ❌ CRITICAL SECURITY: NEVER use f-strings with user-controlled templates
user_template = input("Enter template: ") # user enters: {config.__class__.__dict__}
# NEVER: eval(f"{user_template}") or format_map with attacker-controlled keys
# ✅ Safe template system — use string.Template with $ substitution
from string import Template
t = Template("Hello, $name! Your score is $score.")
result = t.safe_substitute(name=user_name, score=95) # safe_substitute ignores unknown keys
# User CANNOT access variables beyond what you provide
11.2 String Methods for Validation
# Know all the string methods used in security validation:
s = "Hello, World! 123"
# Character classification (per-char boolean)
"abc".isalpha() # True — all letters
"123".isdigit() # True — all digits (0-9)
"abc123".isalnum() # True — letters and digits
" ".isspace() # True — all whitespace
"HELLO".isupper() # True
"hello".islower() # True
"Hello World".istitle() # True — title case
# Safer: use re for precise control
import re
re.fullmatch(r'[a-zA-Z]{2,50}', name) # only letters, 2-50 chars
re.fullmatch(r'\d{4}-\d{2}-\d{2}', date) # YYYY-MM-DD date format
re.fullmatch(r'[^@\s]+@[^@\s]+\.[^@\s]+', email)
# Strip methods
" hello ".strip() # "hello"
" hello ".lstrip() # "hello "
" hello ".rstrip() # " hello"
"xxhelloxx".strip('x') # "hello" — strips specified chars from both ends
# Case methods
"hello".upper() # "HELLO"
"HELLO".lower() # "hello"
"hello world".title() # "Hello World"
"Hello World".swapcase() # "hELLO wORLD"
"hello WORLD".casefold() # "hello world" — more aggressive than lower() for Unicode
# Membership
"hello".startswith("he") # True
"hello".endswith("lo") # True
"hello".startswith(("he", "wo")) # True — tuple of prefixes
# Partitioning
"user=alice".partition('=') # ('user', '=', 'alice')
"host:port:db".split(':', 1) # ['host', 'port:db'] — split once only!
"host:port:db".split(':') # ['host', 'port', 'db']
11.3 Encoding and Unicode Security
# Encoding
"hello".encode('utf-8') # b'hello'
b"hello".decode('utf-8') # "hello"
"café".encode('utf-8') # b'caf\xc3\xa9'
# Always specify encoding explicitly
with open('file.txt', 'r', encoding='utf-8') as f:
data = f.read()
# Unicode normalisation — CRITICAL for security
import unicodedata
# Homograph attack: different Unicode, same visual appearance
admin1 = "admin" # all ASCII
admin2 = "\u0430dmin" # Cyrillic 'а' in first position
print(admin1 == admin2) # False! Different bytes, same look in many fonts
# Normalise before comparing
def safe_compare(a: str, b: str) -> bool:
return unicodedata.normalize('NFKC', a) == unicodedata.normalize('NFKC', b)
# NFKC normalisation collapses many lookalike characters
print(safe_compare("admin", "admin")) # True — fullwidth chars normalised
# HTML escaping (prevent XSS)
import html
user_input = "<script>alert('xss')</script>"
safe = html.escape(user_input) # <script>alert('xss')</script>
unsafe = html.unescape(safe) # back to original (only do this for display)
# URL encoding
from urllib.parse import quote, unquote, urlencode
safe_url = quote("hello world/path") # "hello%20world%2Fpath"
safe_qs = urlencode({'name': 'alice', 'q': 'hello world'})
12. Injection Attacks — SQL, Command, Path Traversal
12.1 SQL Injection — The Most Common Critical Vulnerability
import sqlite3
# ❌ VULNERABLE — string interpolation / concatenation
def get_user_bad(username: str) -> dict | None:
conn = sqlite3.connect("app.db")
query = f"SELECT * FROM users WHERE username = '{username}'"
# If username = "'; DROP TABLE users; --"
# Query becomes: SELECT * FROM users WHERE username = ''; DROP TABLE users; --'
result = conn.execute(query).fetchone()
return result
# ❌ Also vulnerable — .format() and % formatting
query = "SELECT * FROM users WHERE id = %s" % user_id # WRONG
query = "SELECT * FROM users WHERE id = {}".format(user_id) # WRONG
# ✅ SAFE — parameterized queries (placeholders)
def get_user_safe(username: str) -> dict | None:
conn = sqlite3.connect("app.db")
# The ? placeholder is ALWAYS safe — driver handles escaping
cursor = conn.execute(
"SELECT id, username, role FROM users WHERE username = ?",
(username,) # always a tuple, even for single param
)
row = cursor.fetchone()
if row:
return dict(zip([d[0] for d in cursor.description], row))
return None
# ✅ Multiple parameters
def get_active_users_by_role(role: str, min_age: int) -> list:
conn = sqlite3.connect("app.db")
cursor = conn.execute(
"SELECT username, email FROM users WHERE role = ? AND age >= ? AND active = 1",
(role, min_age)
)
return cursor.fetchall()
# Column and table names CANNOT be parameterized — use allowlisting instead
ALLOWED_COLUMNS = {'username', 'email', 'created_at'}
ALLOWED_TABLES = {'users', 'posts', 'comments'}
def dynamic_query(table: str, column: str) -> str:
if table not in ALLOWED_TABLES:
raise ValueError(f"Invalid table: {table!r}")
if column not in ALLOWED_COLUMNS:
raise ValueError(f"Invalid column: {column!r}")
return f"SELECT {column} FROM {table}" # safe because we allowlisted
12.2 Command Injection
import subprocess, shlex, os
# ❌ CRITICALLY VULNERABLE — shell=True with user input
def ping_bad(host: str) -> str:
result = os.system(f"ping -c 1 {host}")
# host = "google.com; cat /etc/passwd" → executes BOTH commands!
# ❌ Also vulnerable
def ping_bad2(host: str) -> str:
result = subprocess.run(f"ping -c 1 {host}", shell=True, capture_output=True)
# ✅ SAFE — list form, no shell
def ping_safe(host: str) -> subprocess.CompletedProcess:
# Validate input first
import re
if not re.fullmatch(r'[a-zA-Z0-9.\-]{1,253}', host):
raise ValueError(f"Invalid hostname: {host!r}")
# List form: OS passes args directly to execve, bypassing the shell
return subprocess.run(
["ping", "-c", "1", host], # each arg is a separate list element
capture_output=True,
text=True,
timeout=5 # always set timeout for external commands!
)
# ✅ If you MUST use shell=True, use shlex.quote (POSIX systems only)
def grep_safe_but_fragile(pattern: str, filepath: str) -> str:
# shlex.quote wraps the argument in single quotes and escapes internal quotes
safe_pattern = shlex.quote(pattern)
safe_filepath = shlex.quote(filepath)
result = subprocess.run(
f"grep {safe_pattern} {safe_filepath}",
shell=True, capture_output=True, text=True
)
return result.stdout
# Still prefer the list form — shlex.quote has edge cases on Windows
12.3 Path Traversal
import os
from pathlib import Path
BASE_DIR = Path("/var/app/uploads").resolve()
# ❌ VULNERABLE — naive path join
def serve_file_bad(filename: str) -> bytes:
path = os.path.join("/var/app/uploads", filename)
# filename = "../../etc/passwd" → "/var/app/uploads/../../etc/passwd" → "/etc/passwd"
with open(path, 'rb') as f:
return f.read()
# ✅ SAFE — resolve and validate the final path
def serve_file_safe(filename: str) -> bytes:
# Resolve symlinks and normalise the path
requested = (BASE_DIR / filename).resolve()
# The final resolved path MUST be inside BASE_DIR
if not str(requested).startswith(str(BASE_DIR)):
raise PermissionError(f"Access denied: {filename!r}")
# Additional validation
if not requested.is_file():
raise FileNotFoundError(f"File not found: {filename!r}")
with open(requested, 'rb') as f:
return f.read()
# Test cases:
# serve_file_safe("report.pdf") → OK
# serve_file_safe("../../etc/passwd") → PermissionError
# serve_file_safe("/etc/shadow") → PermissionError (absolute path attack)
# Using pathlib (cleaner):
def serve_file_pathlib(filename: str) -> bytes:
safe_path = BASE_DIR / Path(filename).name # .name strips directory components!
# Path("../../etc/passwd").name == "passwd" — only the filename!
if not safe_path.exists():
raise FileNotFoundError(filename)
return safe_path.read_bytes()
13. Password & Secret Handling
13.1 Why You Must NEVER Use SHA-256 for Passwords
import hashlib, time
# The fundamental problem: SHA-256 is DESIGNED to be fast
# Performance test (illustrative):
password = "hunter2"
start = time.perf_counter()
for _ in range(1_000_000):
hashlib.sha256(password.encode()).hexdigest()
elapsed = time.perf_counter() - start
print(f"SHA-256: 1M hashes in {elapsed:.2f}s")
# Result: ~0.5s on a laptop CPU
# Modern GPUs: BILLIONS of SHA-256 hashes per second
# Dictionary attack on a stolen SHA-256 password hash: seconds to minutes
# Password hashing requirements:
# 1. SLOW — intentionally high work factor
# 2. SALTED — unique random value per password (prevents rainbow tables)
# 3. ADAPTIVE — work factor can be increased as hardware improves
# These properties are provided by: bcrypt, scrypt, Argon2
# ❌ WRONG password storage methods:
hashlib.md5(password.encode()) # fast + broken cryptography
hashlib.sha256(password.encode()) # fast (see above)
hashlib.sha256((password + "salt").encode()) # fast — salt doesn't fix speed
13.2 bcrypt — Industry Standard
import bcrypt # pip install bcrypt
password = "MySecurePassword!123"
# Hash — slow by design (12 rounds = ~250ms on modern hardware)
# rounds: each increment doubles the work — 12 is a good default (2^12 iterations)
hashed = bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt(rounds=12)
)
# b'$2b$12$...' — 60-byte string containing: algorithm, rounds, salt, hash
print(type(hashed)) # bytes
print(len(hashed)) # 60
# Verify — constant-time comparison built in
def check_password(plain: str, hashed_pw: bytes) -> bool:
return bcrypt.checkpw(plain.encode('utf-8'), hashed_pw)
print(check_password("MySecurePassword!123", hashed)) # True
print(check_password("wrong_password", hashed)) # False
# Storage: store the full hashed bytes (or decode to str for varchar columns)
stored_hash = hashed.decode('utf-8') # safe to store as string
retrieved = stored_hash.encode('utf-8') # convert back for checkpw
13.3 Python's hashlib.scrypt (Built-in)
import hashlib, os
# scrypt — built into Python 3.6+ standard library (no extra package!)
password = b"MySecurePassword!123"
salt = os.urandom(32) # ALWAYS use a random salt, never hardcode
# Parameters:
# n = CPU/memory cost (must be power of 2, min 2^14 for interactive)
# r = block size (8 is standard)
# p = parallelisation factor
hashed = hashlib.scrypt(password, salt=salt, n=2**14, r=8, p=1, dklen=64)
# Store salt + hashed together (both needed for verification)
def hash_password_scrypt(plain: str) -> tuple[bytes, bytes]:
salt = os.urandom(32)
hashed = hashlib.scrypt(plain.encode(), salt=salt, n=2**14, r=8, p=1, dklen=64)
return salt, hashed
def verify_password_scrypt(plain: str, salt: bytes, stored_hash: bytes) -> bool:
computed = hashlib.scrypt(plain.encode(), salt=salt, n=2**14, r=8, p=1, dklen=64)
return hmac.compare_digest(computed, stored_hash) # constant-time!
13.4 Secrets Module — Cryptographically Secure Randomness
import secrets, random
# ❌ WRONG for security purposes
token_bad = random.randint(0, 2**32) # NOT cryptographically random!
token_bad2 = str(hash(time.time())) # predictable, collision-prone
# ✅ CORRECT — secrets module uses OS CSPRNG (urandom)
token_hex = secrets.token_hex(32) # 64 hex chars = 256 bits of entropy
token_b64 = secrets.token_urlsafe(32) # URL-safe base64 = ~43 chars
token_raw = secrets.token_bytes(32) # 32 raw bytes
# Use cases:
session_id = secrets.token_urlsafe(32) # session token
reset_token = secrets.token_hex(32) # password reset link token
csrf_token = secrets.token_hex(16) # CSRF protection
api_key = secrets.token_urlsafe(48) # API key
# secrets.choice — for generating passwords/pins
import string
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
# Pick 16 characters at random — each pick uses CSPRNG
password = ''.join(secrets.choice(alphabet) for _ in range(16))
# secrets.compare_digest (wraps hmac.compare_digest) — constant-time comparison
def safe_compare_tokens(a: str, b: str) -> bool:
return secrets.compare_digest(a, b) # resists timing attacks
13.5 Environment Variables & Secret Management
import os
from functools import lru_cache
# ❌ NEVER hardcode secrets
DATABASE_URL = "postgresql://admin:password123@db.internal/myapp"
API_KEY = "sk_live_abcdef123456"
# ✅ Read from environment
DATABASE_URL = os.environ["DATABASE_URL"] # raises KeyError if missing — GOOD
API_KEY = os.getenv("API_KEY", "") # returns "" if missing
# ✅ Validate at startup — fail fast if config is incomplete
def load_config() -> dict:
required = ["DATABASE_URL", "SECRET_KEY", "REDIS_URL"]
missing = [key for key in required if not os.getenv(key)]
if missing:
raise EnvironmentError(f"Missing required environment variables: {missing}")
return {key: os.environ[key] for key in required}
# ✅ Never log secrets
import logging
secret_key = os.environ["SECRET_KEY"]
logging.info(f"Using key: {secret_key[:8]}...") # log only prefix!
# logging.info(f"Key: {secret_key}") ← NEVER DO THIS
# ✅ Use python-dotenv for local development only
# from dotenv import load_dotenv
# load_dotenv() # loads .env file — .env should be in .gitignore!
14. Cryptography Fundamentals in Python
14.1 HMAC — Message Authentication Code
import hmac, hashlib, secrets
# HMAC: proves a message came from someone with the shared key AND wasn't modified
# It does NOT encrypt — the message is visible, but authenticated
key = secrets.token_bytes(32) # shared secret key
message = b"user_id=42;role=admin;expires=1700000000"
# Create MAC
mac = hmac.new(key, message, hashlib.sha256).digest() # 32 bytes
# Verify on receipt — MUST use compare_digest, never ==
def verify_mac(key: bytes, message: bytes, received_mac: bytes) -> bool:
expected = hmac.new(key, message, hashlib.sha256).digest()
return hmac.compare_digest(expected, received_mac) # constant-time!
# Tamper-evident cookies (simplified):
def sign_cookie(data: str, key: bytes) -> str:
message = data.encode()
mac = hmac.new(key, message, hashlib.sha256).hexdigest()
return f"{data}.{mac}"
def verify_cookie(cookie: str, key: bytes) -> str | None:
if '.' not in cookie:
return None
data, received_mac = cookie.rsplit('.', 1)
expected_mac = hmac.new(key, data.encode(), hashlib.sha256).hexdigest()
if hmac.compare_digest(expected_mac, received_mac):
return data
return None # tampered!
14.2 Symmetric Encryption — Fernet (AES-128-CBC)
from cryptography.fernet import Fernet, InvalidToken # pip install cryptography
# Generate a key — store this securely (env var, secrets manager)
key = Fernet.generate_key() # 32 random bytes, base64-encoded
# b'gAAAAAB...' — safe to store as a string
f = Fernet(key)
# Encrypt — returns ciphertext (bytes)
plaintext = b"This is sensitive PII data"
ciphertext = f.encrypt(plaintext)
# Each call produces different ciphertext (random IV) — safe!
# Decrypt
try:
decrypted = f.decrypt(ciphertext)
print(decrypted) # b"This is sensitive PII data"
except InvalidToken:
print("Decryption failed — data tampered or wrong key!")
# ✅ Fernet provides:
# 1. Confidentiality — AES-128-CBC encryption
# 2. Integrity — HMAC-SHA256 authentication (detects tampering)
# 3. Freshness — includes a timestamp (optional TTL enforcement)
# With TTL (reject tokens older than 60 seconds):
try:
decrypted = f.decrypt(ciphertext, ttl=60)
except InvalidToken:
print("Token expired or invalid")
# Rotating keys — key rotation without losing old data
from cryptography.fernet import MultiFernet
old_key = Fernet.generate_key()
new_key = Fernet.generate_key()
mf = MultiFernet([Fernet(new_key), Fernet(old_key)])
# Encrypts with new_key, decrypts with either
14.3 Hashing for Integrity (Non-Password Use)
import hashlib
# SHA-256 IS appropriate for:
# - File integrity verification (is this download uncorrupted?)
# - Content-addressable storage
# - Generating deterministic IDs from content
# NOT appropriate for: passwords (see section 13)
def file_checksum(filepath: str) -> str:
"""Compute SHA-256 hash of a file efficiently."""
h = hashlib.sha256()
with open(filepath, 'rb') as f:
# Read in chunks — handles large files without loading into memory
for chunk in iter(lambda: f.read(65536), b''):
h.update(chunk)
return h.hexdigest()
# Verify a downloaded file
def verify_download(filepath: str, expected_sha256: str) -> bool:
actual = file_checksum(filepath)
return hmac.compare_digest(actual, expected_sha256.lower())
# Content-based cache key
def cache_key(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()[:16] # first 16 hex chars = 64-bit key
# SHA-3 (Keccak) — alternative to SHA-2, different design
sha3_hash = hashlib.sha3_256(b"data").hexdigest()
15. Authentication — JWT, Sessions, Tokens
15.1 JWT Deep Dive
import jwt # pip install PyJWT
import time
import os
from datetime import datetime, timezone, timedelta
SECRET_KEY = os.environ.get("JWT_SECRET", "change-me-in-production")
# JWT structure: base64url(header).base64url(payload).signature
# Header: {"alg":"HS256","typ":"JWT"}
# Payload: {"sub":"user_123","iat":...,"exp":...}
# Signature: HMAC-SHA256(header.payload, secret)
# CRITICAL: payload is NOT encrypted — only signed — anyone can decode it!
def create_access_token(user_id: str, role: str = "user") -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": user_id, # subject — who this token is for
"role": role, # custom claim
"iat": now, # issued at
"exp": now + timedelta(hours=1), # expiration — ALWAYS set this!
"jti": secrets.token_hex(8), # JWT ID — enables revocation
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_access_token(token: str) -> dict:
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"], # ALWAYS specify allowed algorithms!
# If you omit algorithms=, an attacker can set alg=none in the header
# and forge any token!
)
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError as e:
raise AuthenticationError(f"Invalid token: {e}")
# Token refresh pattern
def create_refresh_token(user_id: str) -> str:
payload = {
"sub": user_id,
"type": "refresh", # mark as refresh token
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(days=30),
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
15.2 JWT Vulnerabilities and Mitigations
# Vulnerability 1: Algorithm confusion (alg=none)
# An attacker strips the signature and sets alg=none in the header
# MITIGATION: always specify algorithms=["HS256"] in decode()
# Vulnerability 2: HS256 with public RSA key
# If you switch from HS256 to RS256, an attacker might send an HS256 token
# signed with the PUBLIC key (which they can see!)
# MITIGATION: be explicit about which algorithms are accepted per token type
# Vulnerability 3: No expiration
# A stolen token works forever
# MITIGATION: always set exp claim, keep it short (1h for access, 30d for refresh)
# Vulnerability 4: Sensitive data in payload
# JWT payload is just base64 — anyone with the token can read it
bad_payload = {
"sub": "user_123",
"password_hash": "$2b$12$...", # ← NEVER DO THIS
"ssn": "123-45-6789", # ← NEVER DO THIS
"credit_card": "4111111111111111" # ← NEVER DO THIS
}
# Vulnerability 5: Missing issuer/audience validation
# A token from service A might be replayed against service B
payload = {
"sub": "user_123",
"iss": "https://auth.myapp.com", # issuer
"aud": "api.myapp.com", # intended audience
}
jwt.decode(token, key, algorithms=["HS256"],
issuer="https://auth.myapp.com",
audience="api.myapp.com") # validates both claims!
16. Timing Attacks & Side Channels
16.1 How Timing Attacks Work
import time, hmac, secrets
# The vulnerability: early-exit string comparison reveals information
# through execution time differences
# ❌ VULNERABLE comparison
def check_api_key_bad(submitted: str, real_key: str) -> bool:
if len(submitted) != len(real_key):
return False # ← returns FAST for wrong-length input — leaks length!
for a, b in zip(submitted, real_key):
if a != b:
return False # ← returns faster when early mismatch found
return True
# An attacker sends millions of requests with different prefixes,
# measures response time, and determines the correct key byte by byte:
#
# "aaaa..." → 0.001ms (fails at pos 0)
# "baaa..." → 0.001ms (fails at pos 0)
# "zaaa..." → 0.002ms (pos 0 matches! fails at pos 1)
# "zbaa..." → 0.002ms (fails at pos 1)
# ...
# Eventually reconstructs the full key through timing measurements
# ✅ SAFE — constant-time comparison regardless of mismatch position
def check_api_key_safe(submitted: str, real_key: str) -> bool:
return hmac.compare_digest(submitted, real_key)
# Equivalent:
def check_api_key_safe2(submitted: str, real_key: str) -> bool:
return secrets.compare_digest(submitted, real_key)
# How compare_digest works (simplified):
# It XORs all byte pairs and accumulates the result
# ALWAYS processes ALL bytes — no early exit
# The result is 0 only if ALL bytes match
16.2 User Enumeration — An Information Side Channel
# If your app says "user not found" vs "wrong password",
# attackers can enumerate which usernames exist!
# ❌ VULNERABLE — reveals whether username exists
def login_bad(username: str, password: str) -> dict:
user = db.find_user(username)
if user is None:
return {"error": "User not found"} # leaks: this username doesn't exist
if not bcrypt.checkpw(password.encode(), user.password_hash):
return {"error": "Incorrect password"} # leaks: this username EXISTS
return {"token": generate_token(user)}
# ✅ SAFE — same error message for both failure cases
def login_safe(username: str, password: str) -> dict:
user = db.find_user(username)
# ALWAYS run bcrypt.checkpw — even when user doesn't exist
# This prevents timing differences too (bcrypt is slow)
dummy_hash = b"$2b$12$00000000000000000000000000000000000000000000000000000000"
if user is None:
bcrypt.checkpw(password.encode(), dummy_hash) # waste equal time
raise AuthenticationError("Invalid username or password")
if not bcrypt.checkpw(password.encode(), user.password_hash):
raise AuthenticationError("Invalid username or password") # SAME message!
return {"token": generate_token(user)}
# Same principle for password reset:
# ❌ "Email not found" vs "Reset email sent"
# ✅ Always: "If that email is registered, you'll receive a reset link"
16.3 Rate Limiting and Brute Force Protection
import time
from collections import defaultdict
import threading
class RateLimiter:
"""Token bucket rate limiter — thread-safe."""
def __init__(self, max_requests: int, window_seconds: int):
self.max = max_requests
self.window = window_seconds
self._data = defaultdict(list) # ip → list of timestamps
self._lock = threading.Lock()
def is_allowed(self, key: str) -> tuple[bool, int]:
"""Returns (allowed, seconds_until_reset)."""
now = time.time()
cutoff = now - self.window
with self._lock:
# Remove expired timestamps
self._data[key] = [t for t in self._data[key] if t > cutoff]
if len(self._data[key]) >= self.max:
oldest = self._data[key][0]
retry_after = int(self.window - (now - oldest)) + 1
return False, retry_after
self._data[key].append(now)
return True, 0
# Usage in a login endpoint:
limiter = RateLimiter(max_requests=5, window_seconds=300) # 5 per 5 minutes
def protected_login(ip: str, username: str, password: str) -> dict:
allowed, retry_after = limiter.is_allowed(ip)
if not allowed:
raise RateLimitError(retry_after=retry_after)
return login_safe(username, password)
17. Dangerous Python Patterns to Avoid
17.1 eval, exec, and compile — Never on Untrusted Input
# eval() executes arbitrary Python expressions
# exec() executes arbitrary Python statements (including imports, class definitions)
# compile() prepares code objects for exec() or eval()
# ❌ CATASTROPHICALLY DANGEROUS
eval(input("Enter expression: "))
exec(user_provided_code_string)
# ❌ Trying to "sandbox" eval — this does NOT work
eval(expr, {"__builtins__": {}})
# Attackers bypass this with:
# ().__class__.__bases__[0].__subclasses__() — accesses all classes via MRO
# Can still reach file system, network, etc.
# ✅ Safe alternatives for common eval use cases:
# Math expressions:
import ast
def safe_eval_math(expr: str) -> float:
tree = ast.parse(expr, mode='eval')
# Whitelist allowed node types
ALLOWED = {
ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant,
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Pow, ast.USub
}
for node in ast.walk(tree):
if type(node) not in ALLOWED:
raise ValueError(f"Forbidden expression: {type(node).__name__}")
return eval(compile(tree, '<expr>', 'eval'))
safe_eval_math("2 + 2 * 3") # 8 — OK
# safe_eval_math("__import__('os')") → ValueError: Forbidden
# Literal data structures only:
import ast
data = ast.literal_eval("{'key': [1, 2, 3], 'flag': True}") # safe
# Only handles: strings, bytes, numbers, tuples, lists, dicts, sets, booleans, None
17.2 Pickle — Deserialisation of Untrusted Data
import pickle, json
# ❌ CRITICAL VULNERABILITY — pickle can execute arbitrary code
class Exploit:
def __reduce__(self):
import os
return (os.system, ("echo OWNED && cat /etc/passwd",))
payload = pickle.dumps(Exploit())
# pickle.loads(payload) → executes os.system("echo OWNED...")!
# Any user-controlled data run through pickle.loads is RCE
# This includes: uploaded files, cached values from Redis, cookies, API payloads
# ✅ Use JSON for data exchange
safe_data = json.loads(user_provided_json) # executes no code
# ✅ If you MUST use pickle, sign it first
import hmac, hashlib
def safe_pickle_dumps(obj: object, key: bytes) -> bytes:
data = pickle.dumps(obj)
mac = hmac.new(key, data, hashlib.sha256).digest()
return mac + data # prepend MAC
def safe_pickle_loads(signed_data: bytes, key: bytes) -> object:
mac, data = signed_data[:32], signed_data[32:]
expected = hmac.new(key, data, hashlib.sha256).digest()
if not hmac.compare_digest(mac, expected):
raise ValueError("Pickle data tampered or invalid key")
return pickle.loads(data) # only safe because we verified the MAC
# Alternatives to pickle: json, msgpack, protobuf, orjson
17.3 Other Dangerous Patterns
import yaml # PyYAML
# ❌ yaml.load on untrusted data — similar to pickle!
data = yaml.load(user_input, Loader=yaml.Loader) # DANGEROUS — can call constructors
# ✅ Use safe loader
data = yaml.safe_load(user_input) # only basic Python types, no constructors
data = yaml.load(user_input, Loader=yaml.SafeLoader) # explicit
# ❌ assert for security checks — disabled with -O flag!
assert user.is_authenticated, "Not authenticated" # COMPLETELY BYPASSED with python -O
# ✅ Use proper exceptions
if not user.is_authenticated:
raise AuthenticationError("Authentication required")
# ❌ Logging sensitive data
import logging
logging.debug(f"Processing request: user={user.username} password={password}")
logging.info(f"Auth token: {token}")
# ✅ Log what's needed, redact the rest
logging.info(f"Auth attempt for user: {user.username}")
logging.debug(f"Token prefix: {token[:8]}...")
# ❌ Wildcard imports in security-sensitive code
from crypto_utils import * # What did you import? Can attackers control it?
# ✅ Explicit imports
from crypto_utils import encrypt, decrypt, verify_mac
# ❌ Mutable default arguments (a Python gotcha)
def add_to_blacklist(ip: str, blacklist: list = []) -> list: # shared between calls!
blacklist.append(ip)
return blacklist
# ✅ Use None and create fresh default each call
def add_to_blacklist(ip: str, blacklist: list | None = None) -> list:
if blacklist is None:
blacklist = []
blacklist.append(ip)
return blacklist
18. Practice Problems with Full Solutions
Problem A — Secure User Registration Pipeline
Problem: Build a complete register_user() function that validates all inputs using built-ins (any, all, filter, map), hashes the password, and raises specific custom exceptions.
import re, bcrypt, secrets
from dataclasses import dataclass
class RegistrationError(Exception):
def __init__(self, message: str, fields: list[str]):
super().__init__(message)
self.fields = fields
@dataclass
class NewUser:
username: str
email: str
password_hash: bytes
def register_user(username: str, email: str, password: str, confirm: str) -> NewUser:
errors = []
# Validate username
if not re.fullmatch(r'[a-zA-Z0-9_]{3,32}', username):
errors.append("username: 3–32 alphanumeric chars/underscores only")
# Validate email (simplified)
if not re.fullmatch(r'[^@\s]+@[^@\s]+\.[^@\s]+', email.strip()):
errors.append("email: invalid format")
# Validate password using any() and all()
password_rules = [
(len(password) >= 12, "at least 12 characters"),
(any(c.isupper() for c in password), "at least one uppercase letter"),
(any(c.islower() for c in password), "at least one lowercase letter"),
(any(c.isdigit() for c in password), "at least one digit"),
(any(c in '!@#$%^&*()_+-=' for c in password),"at least one special character"),
]
pw_errors = [msg for ok, msg in password_rules if not ok]
if pw_errors:
errors.append(f"password: requires {', '.join(pw_errors)}")
# Confirm password matches
if password != confirm:
errors.append("confirm_password: passwords do not match")
if errors:
raise RegistrationError("Registration failed", fields=errors)
# Hash password — only after all validation passes
hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(rounds=12))
return NewUser(
username=username.strip(),
email=email.strip().lower(),
password_hash=hashed
)
# Test it:
try:
user = register_user("alice_99", "alice@example.com", "MyPass!2024ok", "MyPass!2024ok")
print(f"Registered: {user.username}")
except RegistrationError as e:
for field_error in e.fields:
print(f" ✗ {field_error}")
Problem B — Log Parser with Functional Built-ins
Problem: Parse a list of access log entries and produce a security report: top-5 IPs with most 4xx errors, sorted by count descending.
from collections import Counter
logs = [
"192.168.1.1 - GET /login 200",
"203.0.113.5 - POST /login 401",
"203.0.113.5 - POST /login 401",
"203.0.113.5 - GET /admin 403",
"10.0.0.1 - GET /data 200",
"203.0.113.5 - GET /etc/passwd 404",
"192.168.1.1 - GET /api 401",
"198.51.100.1 - POST /login 401",
"198.51.100.1 - POST /login 401",
"203.0.113.5 - GET /wp-admin 404",
]
def parse_entry(line: str) -> dict:
parts = line.split()
return {
'ip': parts[0],
'method': parts[2],
'path': parts[3],
'status': int(parts[4]),
}
def security_report(logs: list[str], top_n: int = 5) -> list[dict]:
# 1. Parse all entries
parsed = list(map(parse_entry, logs))
# 2. Filter to 4xx status codes only
client_errors = list(filter(lambda e: 400 <= e['status'] < 500, parsed))
# 3. Count errors per IP
ip_counts = Counter(e['ip'] for e in client_errors)
# 4. Sort by count descending, then IP for stability
top_ips = sorted(
ip_counts.items(),
key=lambda pair: (-pair[1], pair[0]) # negative count → descending
)[:top_n]
# 5. Build report with enumerate for rank
return [
{'rank': rank, 'ip': ip, 'error_count': count}
for rank, (ip, count) in enumerate(top_ips, start=1)
]
report = security_report(logs)
for entry in report:
print(f"#{entry['rank']}: {entry['ip']} — {entry['error_count']} errors")
# #1: 203.0.113.5 — 4 errors
# #2: 192.168.1.1 — 1 errors
# #3: 198.51.100.1 — 2 errors ← wait, let's sort correctly
Problem C — Exception Chain: Database Config Loader
Problem: Write a load_database_config() function that reads from an env var, parses it, and uses proper exception chaining to wrap lower-level errors in application-level errors.
import os, re
from urllib.parse import urlparse
class ConfigError(Exception):
pass
class DatabaseConfig:
def __init__(self, host, port, dbname, user, password):
self.host = host
self.port = port
self.dbname = dbname
self.user = user
self.password = password
def __repr__(self):
return f"DatabaseConfig(host={self.host!r}, port={self.port}, db={self.dbname!r}, user={self.user!r})"
def load_database_config() -> DatabaseConfig:
# Step 1: Get environment variable
try:
url_str = os.environ["DATABASE_URL"]
except KeyError as e:
raise ConfigError("DATABASE_URL environment variable is not set") from e
# Step 2: Parse the URL
try:
url = urlparse(url_str)
if url.scheme not in ('postgresql', 'postgres', 'mysql'):
raise ValueError(f"Unsupported scheme: {url.scheme!r}")
host = url.hostname
port = url.port or (5432 if 'postgres' in url.scheme else 3306)
dbname = url.path.lstrip('/')
user = url.username
password = url.password
# Validate required components
missing = [name for name, val in [
('host', host), ('dbname', dbname), ('user', user), ('password', password)
] if not val]
if missing:
raise ValueError(f"Missing components: {', '.join(missing)}")
except ValueError as e:
raise ConfigError(f"Invalid DATABASE_URL format: {e}") from e
# Step 3: Validate port range
if not (1 <= port <= 65535):
raise ConfigError(f"Invalid port number: {port}")
return DatabaseConfig(host=host, port=port, dbname=dbname, user=user, password=password)
# Test:
os.environ["DATABASE_URL"] = "postgresql://admin:secret@db.local:5432/myapp"
try:
cfg = load_database_config()
print(cfg)
except ConfigError as e:
print(f"Config error: {e}")
if e.__cause__:
print(f" Caused by: {e.__cause__}")
Problem D — Custom Context Manager: Audit Logging
Problem: Write a context manager audit_action() that logs the start, end, duration, and success/failure of any operation, writing to a tamper-evident audit log.
import contextlib, time, hmac, hashlib, json, os
from datetime import datetime, timezone
AUDIT_KEY = secrets.token_bytes(32) # in production: load from secure store
def sign_record(record: dict, key: bytes) -> str:
"""Create a signed audit record."""
payload = json.dumps(record, sort_keys=True, default=str).encode()
mac = hmac.new(key, payload, hashlib.sha256).hexdigest()
return json.dumps({**record, '_mac': mac}, default=str)
@contextlib.contextmanager
def audit_action(action: str, user_id: str, resource: str, log_file: str = "audit.log"):
record = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'action': action,
'user_id': user_id,
'resource': resource,
'status': 'started',
}
start = time.perf_counter()
try:
yield # run the protected code
record['status'] = 'success'
record['duration'] = round(time.perf_counter() - start, 4)
except Exception as e:
record['status'] = 'failure'
record['error_type'] = type(e).__name__
record['error_msg'] = str(e)
record['duration'] = round(time.perf_counter() - start, 4)
raise # re-raise — don't swallow
finally:
# Always write the audit log entry
signed = sign_record(record, AUDIT_KEY)
with open(log_file, 'a') as f:
f.write(signed + '\n')
# Usage:
with audit_action("file_download", user_id="u123", resource="/reports/q4.pdf"):
# simulate file serving
time.sleep(0.01)
print("File served")
try:
with audit_action("admin_panel", user_id="u456", resource="/admin"):
raise PermissionError("Insufficient privileges")
except PermissionError as e:
print(f"Blocked: {e}")
Problem E — Functional Pipeline: Security Event Processor
Problem: Using zip, enumerate, sorted, filter, map, any, all, and sum, build a pipeline that scores and prioritises security events.
events = [
{'type': 'login_fail', 'ip': '203.0.113.5', 'user': 'root', 'count': 50},
{'type': 'login_ok', 'ip': '10.0.0.1', 'user': 'alice', 'count': 1},
{'type': 'port_scan', 'ip': '203.0.113.5', 'user': None, 'count': 1000},
{'type': 'data_exfil', 'ip': '192.168.1.50','user': 'bob', 'count': 3},
{'type': 'login_fail', 'ip': '198.51.100.1','user': 'admin', 'count': 12},
{'type': 'config_change', 'ip': '10.0.0.5', 'user': 'carol', 'count': 1},
]
SEVERITY = {
'login_fail': 2,
'port_scan': 3,
'data_exfil': 5,
'config_change': 3,
'login_ok': 0,
}
HIGH_VALUE_USERS = {'root', 'admin', 'carol'}
def score_event(e: dict) -> int:
base = SEVERITY.get(e['type'], 1)
count_factor = min(e['count'] // 10, 10) # cap at 10
user_bonus = 2 if e.get('user') in HIGH_VALUE_USERS else 0
return base + count_factor + user_bonus
# 1. Filter out benign events
suspicious = list(filter(lambda e: SEVERITY.get(e['type'], 1) > 0, events))
# 2. Score and annotate each event using map
scored = list(map(lambda e: {**e, 'score': score_event(e)}, suspicious))
# 3. Sort by score descending
prioritised = sorted(scored, key=lambda e: -e['score'])
# 4. Enumerate with rank for the report
report = [(rank, e) for rank, e in enumerate(prioritised, start=1)]
# 5. Summary stats using sum and any/all
total_score = sum(e['score'] for e in scored)
has_critical = any(e['score'] >= 10 for e in scored)
all_internal = all(e['ip'].startswith('10.') for e in scored)
print("=== SECURITY EVENT REPORT ===")
for rank, event in report:
print(f" #{rank} [{event['score']:2d}] {event['type']:<15} ip={event['ip']:<15} user={event.get('user','N/A')}")
print(f"\nTotal risk score: {total_score}")
print(f"Critical events present: {has_critical}")
print(f"All internal: {all_internal}")
# 6. Group by IP using zip and dict
ips = [e['ip'] for e in scored]
scores = [e['score'] for e in scored]
from collections import defaultdict
ip_risk = defaultdict(int)
for ip, score in zip(ips, scores):
ip_risk[ip] += score
worst_ip = max(ip_risk, key=ip_risk.get)
print(f"\nHighest risk IP: {worst_ip} (total score {ip_risk[worst_ip]})")
19. Assessment Cheat Sheet
Built-in Functions — Signatures and Returns
| Function | Signature | Returns | Notes |
|---|---|---|---|
zip |
zip(*iterables) |
lazy iterator of tuples | stops at shortest |
enumerate |
enumerate(it, start=0) |
lazy iterator of (int, item) | start defaults to 0 |
map |
map(fn, *iterables) |
lazy iterator | multiple iterables = zip-like |
filter |
filter(fn, iterable) |
lazy iterator |
filter(None, it) removes falsy |
sum |
sum(it, start=0) |
number |
start=[] flattens (slow!) |
any |
any(iterable) |
bool | short-circuits on first True |
all |
all(iterable) |
bool | short-circuits on first False |
min |
min(it, key=None, default=...) |
element | raises on empty without default |
max |
max(it, key=None, default=...) |
element | key transforms before comparing |
sorted |
sorted(it, key=None, reverse=False) |
new list | stable, works on any iterable |
reversed |
reversed(seq) |
lazy iterator | requires sequence (has len+index) |
Exception Handling Rules
try:
... # might raise
except Specific as e:
... # handle one type
except (A, B) as e:
... # handle multiple
except Exception as e:
... # catch-all (NOT BaseException!)
raise # re-raise without losing traceback
else:
... # runs ONLY if try succeeded — not protected by excepts
finally:
... # ALWAYS runs — cleanup goes here
Exception Chaining
| Syntax | Effect | Use when |
|---|---|---|
raise X from Y |
Sets X.__cause__ = Y
|
Y is the documented root cause |
raise X from None |
Suppresses chain | Original reveals sensitive info |
raise X (inside except) |
Sets X.__context__ = Y
|
Automatic implicit chaining |
raise (bare) |
Re-raises current | Want to log but propagate |
Security Quick Reference
| Risk | Vulnerable Code | Safe Code |
|---|---|---|
| SQL Injection | f"WHERE name='{name}'" |
cursor.execute("WHERE name=?", (name,)) |
| Command Injection | os.system(f"ls {path}") |
subprocess.run(["ls", path]) |
| Path Traversal | open(base + user_path) |
Resolve + verify prefix |
| Password Storage | sha256(password) |
bcrypt.hashpw(pw, gensalt(12)) |
| Random Tokens | random.randint(...) |
secrets.token_hex(32) |
| Code Execution | eval(user_input) |
ast.literal_eval(...) |
| Pickle RCE | pickle.loads(untrusted) |
json.loads(untrusted) |
| Token Comparison | a == b |
hmac.compare_digest(a, b) |
| User Enumeration | Different error per case | Same error always |
| Secret Storage | Hardcoded in source | os.environ["SECRET"] |
Python 3.13 Specifics
| Feature | Details |
|---|---|
input() |
Always returns str, never evaluates — reinforced in 3.13 |
| New REPL | Multi-line edit, F1=help, F2=clean copy, F3=paste mode |
| Free-threaded | Experimental --disable-gil build; use explicit locks |
| Error messages |
NameError/AttributeError now suggest corrections |
type X = ... |
Type alias statement (stabilised from 3.12) |
def fn[T](...) |
Inline type parameter syntax |
Lambda Patterns
# ✅ Use lambda for: short, inline, anonymous key/transform functions
sorted(data, key=lambda d: (d['priority'], d['name']))
filter(lambda x: x > 0, data)
max(users, key=lambda u: u.score)
# ❌ Avoid lambda for: complex logic, named reusable functions, multi-line
square = lambda x: x**2 # use def instead
# Closure gotcha — capture loop variable
# ❌ Bug: [lambda x: x*n for n in range(5)]
# ✅ Fixed: [lambda x, n=n: x*n for n in range(5)]
Common Mistakes in Assessments
-
any([])returnsFalse;all([])returnsTrue— know these edge cases -
zipstops at shortest — usezip_longestif you want all elements -
map/filterreturn iterators — wrap inlist()if you need to index or iterate multiple times -
sorted()returns a list;.sort()returnsNone— don't dox = lst.sort() -
lambdain a loop captures the variable, not its value — use default argumentn=n -
Bare
except:catchesSystemExitandCtrl+C— always useexcept Exception: -
elsein try/except runs only when NO exception occurred — not the same as code after try block -
raise from Nonesuppresses exception chain — know when to use it -
input()always returnsstr— explicit conversion required:int(input("N: ")) -
hmac.compare_digestnot==for all security-sensitive comparisons
Top comments (0)