DEV Community

Cover image for Python Context Managers: The with Statement Beyond File Handling
German Yamil
German Yamil

Posted on

Python Context Managers: The with Statement Beyond File Handling

Python Context Managers: The with Statement Beyond File Handling

You already know with open("file.txt") as f:. That's a context manager.

What most tutorials skip: you can write your own, and the pattern applies to far more than files โ€” timers, database connections, temporary directories, logging, locks, and any resource that needs setup and guaranteed cleanup.


๐ŸŽ Free: AI Publishing Checklist โ€” 7 steps in Python ยท Full pipeline: germy5.gumroad.com/l/xhxkzz (pay what you want, min $9.99)


What with actually does

# This:
with open("data.json") as f:
    content = f.read()

# Is equivalent to:
f = open("data.json")
try:
    content = f.read()
finally:
    f.close()   # always runs, even if an exception occurs
Enter fullscreen mode Exit fullscreen mode

The with block guarantees cleanup runs โ€” whether the block exits normally, raises an exception, or even if the process gets interrupted. That's the entire point.

Built-in context managers you already have

import threading
import tempfile
import os

# Files (most common)
with open("output.txt", "w") as f:
    f.write("hello")

# Temporary directories โ€” deleted automatically on exit
with tempfile.TemporaryDirectory() as tmpdir:
    script = os.path.join(tmpdir, "task.py")
    with open(script, "w") as f:
        f.write("print('hi')")
    # tmpdir and all its contents are deleted here

# Temporary files
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as tmp:
    tmp.write("print('hello')")
    path = tmp.name

# Thread locks
lock = threading.Lock()
with lock:
    # only one thread runs this at a time
    shared_counter += 1
# lock released automatically

# Suppress specific exceptions
from contextlib import suppress
with suppress(FileNotFoundError):
    os.remove("might_not_exist.txt")
# continues normally if file doesn't exist
Enter fullscreen mode Exit fullscreen mode

Writing your own: contextlib.contextmanager

The easiest way to create a context manager is with the @contextmanager decorator:

from contextlib import contextmanager
import time

@contextmanager
def timer(label: str):
    """Measure and print execution time of a block."""
    start = time.perf_counter()
    try:
        yield   # code inside `with` block runs here
    finally:
        elapsed = time.perf_counter() - start
        print(f"[{label}] {elapsed:.3f}s")

# Usage
with timer("LLM call"):
    # ... call the API ...
    time.sleep(0.1)  # simulate work
# prints: [LLM call] 0.100s
Enter fullscreen mode Exit fullscreen mode

The structure is always: setup โ†’ yield โ†’ cleanup. The yield can optionally return a value (the as variable):

@contextmanager
def temp_script(code: str):
    """Write code to a temp file, yield the path, delete on exit."""
    import tempfile, os
    tmp = tempfile.NamedTemporaryFile(
        mode="w", suffix=".py", delete=False, encoding="utf-8"
    )
    try:
        tmp.write(code)
        tmp.close()
        yield tmp.name      # <-- this becomes the `as` variable
    finally:
        os.unlink(tmp.name) # always deleted

# Usage
with temp_script("print('hello world')") as path:
    import subprocess
    result = subprocess.run(["python3", path], capture_output=True, text=True)
    print(result.stdout)  # hello world
# file deleted here automatically
Enter fullscreen mode Exit fullscreen mode

Class-based context managers

For more complex setup/teardown, implement __enter__ and __exit__:

class ManagedDB:
    """Database connection that always closes, even on error."""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = None

    def __enter__(self):
        import sqlite3
        self.conn = sqlite3.connect(self.db_path)
        return self.conn          # becomes the `as` variable

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            if exc_type is None:
                self.conn.commit()  # commit on success
            else:
                self.conn.rollback()  # rollback on error
            self.conn.close()
        return False  # don't suppress exceptions

# Usage
with ManagedDB("pipeline.db") as db:
    db.execute("INSERT INTO tasks VALUES (?, ?)", ("t01", "done"))
# committed and closed automatically
Enter fullscreen mode Exit fullscreen mode

__exit__ receives the exception info if one occurred. Return True to suppress it, False (or None) to let it propagate.

Pattern: logging context

from contextlib import contextmanager
import logging

log = logging.getLogger(__name__)

@contextmanager
def task_context(task_id: str, task_name: str):
    """Log task start/end, catch and log errors."""
    log.info(f"[{task_id}] Starting: {task_name}")
    try:
        yield
        log.info(f"[{task_id}] โœ… Complete: {task_name}")
    except Exception as e:
        log.error(f"[{task_id}] โŒ Failed: {task_name} โ€” {e}")
        raise  # re-raise so the caller can handle it

# Usage
with task_context("ch03", "Generate chapter"):
    content = generate_chapter(outline["ch03"])
    save_to_file(content, "chapters/ch03.md")
Enter fullscreen mode Exit fullscreen mode

Pattern: atomic file writes

from contextlib import contextmanager
import os
import tempfile

@contextmanager
def atomic_write(path: str, mode: str = "w", **kwargs):
    """
    Write to a temp file; rename to target on success.
    If an error occurs, the original file is untouched.
    """
    dir_name = os.path.dirname(os.path.abspath(path))
    tmp = tempfile.NamedTemporaryFile(
        mode=mode, dir=dir_name, delete=False, **kwargs
    )
    try:
        yield tmp
        tmp.close()
        os.replace(tmp.name, path)   # atomic on POSIX
    except Exception:
        tmp.close()
        os.unlink(tmp.name)          # discard incomplete file
        raise

# Usage
with atomic_write("state.json", encoding="utf-8") as f:
    import json
    json.dump({"task-01": "done"}, f, indent=2)
# state.json is only updated if no exception occurred
Enter fullscreen mode Exit fullscreen mode

Pattern: retrying context

from contextlib import contextmanager
import time

@contextmanager
def retry_on_error(max_attempts: int = 3, delay: float = 1.0, exceptions=(Exception,)):
    """Retry the block up to max_attempts times on specified exceptions."""
    for attempt in range(1, max_attempts + 1):
        try:
            yield attempt
            break  # success โ€” exit retry loop
        except exceptions as e:
            if attempt == max_attempts:
                raise
            print(f"Attempt {attempt} failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)
            delay *= 2  # exponential backoff

# Usage
with retry_on_error(max_attempts=3, exceptions=(TimeoutError, ConnectionError)) as attempt:
    print(f"Attempt {attempt}")
    result = call_api()  # retried on TimeoutError or ConnectionError
Enter fullscreen mode Exit fullscreen mode

Multiple context managers in one with

# Old style (nested)
with open("input.txt") as src:
    with open("output.txt", "w") as dst:
        dst.write(src.read().upper())

# Modern style (single with, comma-separated)
with open("input.txt") as src, open("output.txt", "w") as dst:
    dst.write(src.read().upper())

# With your custom managers
with timer("full pipeline"), task_context("run", "Pipeline"):
    run_pipeline()
Enter fullscreen mode Exit fullscreen mode

contextlib.ExitStack โ€” dynamic context managers

When you don't know at compile time how many context managers you need:

from contextlib import ExitStack

chapters = ["ch01.md", "ch02.md", "ch03.md"]

with ExitStack() as stack:
    # Open all files, register them all for cleanup
    file_handles = [
        stack.enter_context(open(f"chapters/{ch}"))
        for ch in chapters
    ]
    # all files open here
    for fh in file_handles:
        print(fh.read()[:50])
# all files closed here, even if some failed
Enter fullscreen mode Exit fullscreen mode

Real pipeline usage

This is how the publishing pipeline uses context managers in practice:

from contextlib import contextmanager
import tempfile, os, subprocess, time, logging

log = logging.getLogger(__name__)

@contextmanager
def isolated_execution(code: str, timeout: int = 30):
    """
    Run code in a temp directory. Yields (stdout, stderr, returncode).
    Cleans up temp dir regardless of outcome.
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        script = os.path.join(tmpdir, "task.py")
        with open(script, "w", encoding="utf-8") as f:
            f.write(code)

        result = subprocess.run(
            ["python3", script],
            capture_output=True, text=True,
            timeout=timeout, cwd=tmpdir,
        )
        yield result.stdout, result.stderr, result.returncode
        # tmpdir deleted here


# Clean calling code:
code = "for i in range(5): print(i)"

with isolated_execution(code) as (stdout, stderr, rc):
    if rc == 0:
        log.info(f"Output: {stdout.strip()}")
    else:
        log.error(f"Failed: {stderr.strip()}")
Enter fullscreen mode Exit fullscreen mode

The pipeline wraps every subprocess execution and file write in context managers โ€” zero leaked resources across 10-chapter ebook generation: germy5.gumroad.com/l/xhxkzz โ€” pay what you want, min $9.99.


Further Reading

Top comments (0)