Python Context Managers: The with Statement Beyond File Handling
You already know with open("file.txt") as f:. That's a context manager.
What most tutorials skip: you can write your own, and the pattern applies to far more than files โ timers, database connections, temporary directories, logging, locks, and any resource that needs setup and guaranteed cleanup.
๐ Free: AI Publishing Checklist โ 7 steps in Python ยท Full pipeline: germy5.gumroad.com/l/xhxkzz (pay what you want, min $9.99)
What with actually does
# This:
with open("data.json") as f:
content = f.read()
# Is equivalent to:
f = open("data.json")
try:
content = f.read()
finally:
f.close() # always runs, even if an exception occurs
The with block guarantees cleanup runs โ whether the block exits normally, raises an exception, or even if the process gets interrupted. That's the entire point.
Built-in context managers you already have
import threading
import tempfile
import os
# Files (most common)
with open("output.txt", "w") as f:
f.write("hello")
# Temporary directories โ deleted automatically on exit
with tempfile.TemporaryDirectory() as tmpdir:
script = os.path.join(tmpdir, "task.py")
with open(script, "w") as f:
f.write("print('hi')")
# tmpdir and all its contents are deleted here
# Temporary files
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp:
tmp.write("print('hello')")
path = tmp.name
# Thread locks
lock = threading.Lock()
with lock:
# only one thread runs this at a time
shared_counter += 1
# lock released automatically
# Suppress specific exceptions
from contextlib import suppress
with suppress(FileNotFoundError):
os.remove("might_not_exist.txt")
# continues normally if file doesn't exist
Writing your own: contextlib.contextmanager
The easiest way to create a context manager is with the @contextmanager decorator:
from contextlib import contextmanager
import time
@contextmanager
def timer(label: str):
"""Measure and print execution time of a block."""
start = time.perf_counter()
try:
yield # code inside `with` block runs here
finally:
elapsed = time.perf_counter() - start
print(f"[{label}] {elapsed:.3f}s")
# Usage
with timer("LLM call"):
# ... call the API ...
time.sleep(0.1) # simulate work
# prints: [LLM call] 0.100s
The structure is always: setup โ yield โ cleanup. The yield can optionally return a value (the as variable):
@contextmanager
def temp_script(code: str):
"""Write code to a temp file, yield the path, delete on exit."""
import tempfile, os
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False, encoding="utf-8"
)
try:
tmp.write(code)
tmp.close()
yield tmp.name # <-- this becomes the `as` variable
finally:
os.unlink(tmp.name) # always deleted
# Usage
with temp_script("print('hello world')") as path:
import subprocess
result = subprocess.run(["python3", path], capture_output=True, text=True)
print(result.stdout) # hello world
# file deleted here automatically
Class-based context managers
For more complex setup/teardown, implement __enter__ and __exit__:
class ManagedDB:
"""Database connection that always closes, even on error."""
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
def __enter__(self):
import sqlite3
self.conn = sqlite3.connect(self.db_path)
return self.conn # becomes the `as` variable
def __exit__(self, exc_type, exc_val, exc_tb):
if self.conn:
if exc_type is None:
self.conn.commit() # commit on success
else:
self.conn.rollback() # rollback on error
self.conn.close()
return False # don't suppress exceptions
# Usage
with ManagedDB("pipeline.db") as db:
db.execute("INSERT INTO tasks VALUES (?, ?)", ("t01", "done"))
# committed and closed automatically
__exit__ receives the exception info if one occurred. Return True to suppress it, False (or None) to let it propagate.
Pattern: logging context
from contextlib import contextmanager
import logging
log = logging.getLogger(__name__)
@contextmanager
def task_context(task_id: str, task_name: str):
"""Log task start/end, catch and log errors."""
log.info(f"[{task_id}] Starting: {task_name}")
try:
yield
log.info(f"[{task_id}] โ
Complete: {task_name}")
except Exception as e:
log.error(f"[{task_id}] โ Failed: {task_name} โ {e}")
raise # re-raise so the caller can handle it
# Usage
with task_context("ch03", "Generate chapter"):
content = generate_chapter(outline["ch03"])
save_to_file(content, "chapters/ch03.md")
Pattern: atomic file writes
from contextlib import contextmanager
import os
import tempfile
@contextmanager
def atomic_write(path: str, mode: str = "w", **kwargs):
"""
Write to a temp file; rename to target on success.
If an error occurs, the original file is untouched.
"""
dir_name = os.path.dirname(os.path.abspath(path))
tmp = tempfile.NamedTemporaryFile(
mode=mode, dir=dir_name, delete=False, **kwargs
)
try:
yield tmp
tmp.close()
os.replace(tmp.name, path) # atomic on POSIX
except Exception:
tmp.close()
os.unlink(tmp.name) # discard incomplete file
raise
# Usage
with atomic_write("state.json", encoding="utf-8") as f:
import json
json.dump({"task-01": "done"}, f, indent=2)
# state.json is only updated if no exception occurred
Pattern: retrying context
from contextlib import contextmanager
import time
@contextmanager
def retry_on_error(max_attempts: int = 3, delay: float = 1.0, exceptions=(Exception,)):
"""Retry the block up to max_attempts times on specified exceptions."""
for attempt in range(1, max_attempts + 1):
try:
yield attempt
break # success โ exit retry loop
except exceptions as e:
if attempt == max_attempts:
raise
print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
delay *= 2 # exponential backoff
# Usage
with retry_on_error(max_attempts=3, exceptions=(TimeoutError, ConnectionError)) as attempt:
print(f"Attempt {attempt}")
result = call_api() # retried on TimeoutError or ConnectionError
Multiple context managers in one with
# Old style (nested)
with open("input.txt") as src:
with open("output.txt", "w") as dst:
dst.write(src.read().upper())
# Modern style (single with, comma-separated)
with open("input.txt") as src, open("output.txt", "w") as dst:
dst.write(src.read().upper())
# With your custom managers
with timer("full pipeline"), task_context("run", "Pipeline"):
run_pipeline()
contextlib.ExitStack โ dynamic context managers
When you don't know at compile time how many context managers you need:
from contextlib import ExitStack
chapters = ["ch01.md", "ch02.md", "ch03.md"]
with ExitStack() as stack:
# Open all files, register them all for cleanup
file_handles = [
stack.enter_context(open(f"chapters/{ch}"))
for ch in chapters
]
# all files open here
for fh in file_handles:
print(fh.read()[:50])
# all files closed here, even if some failed
Real pipeline usage
This is how the publishing pipeline uses context managers in practice:
from contextlib import contextmanager
import tempfile, os, subprocess, time, logging
log = logging.getLogger(__name__)
@contextmanager
def isolated_execution(code: str, timeout: int = 30):
"""
Run code in a temp directory. Yields (stdout, stderr, returncode).
Cleans up temp dir regardless of outcome.
"""
with tempfile.TemporaryDirectory() as tmpdir:
script = os.path.join(tmpdir, "task.py")
with open(script, "w", encoding="utf-8") as f:
f.write(code)
result = subprocess.run(
["python3", script],
capture_output=True, text=True,
timeout=timeout, cwd=tmpdir,
)
yield result.stdout, result.stderr, result.returncode
# tmpdir deleted here
# Clean calling code:
code = "for i in range(5): print(i)"
with isolated_execution(code) as (stdout, stderr, rc):
if rc == 0:
log.info(f"Output: {stdout.strip()}")
else:
log.error(f"Failed: {stderr.strip()}")
The pipeline wraps every subprocess execution and file write in context managers โ zero leaked resources across 10-chapter ebook generation: germy5.gumroad.com/l/xhxkzz โ pay what you want, min $9.99.
Top comments (0)