DEV Community

Cover image for Python Internals: Generators & Coroutines
aykhlf yassir
aykhlf yassir

Posted on

Python Internals: Generators & Coroutines

There's a function that exists in almost every Python codebase. It looks harmless:

def get_trades(symbol: str) -> list[dict]:
    results = []
    for record in enormous_database_cursor:
        if record["symbol"] == symbol:
            results.append(record)
    return results

trades = get_trades("AAPL")  # πŸ’€ Waits. Waits. Waits. Then crashes.
Enter fullscreen mode Exit fullscreen mode

The problems stack up fast:

  • High latency: You get zero items until the entire database is scanned
  • Massive RAM: Every matching record is held in memory simultaneously
  • Fragility: One spike in result size kills the process

This is the "eager" patternβ€”do all the work, collect all the results, then hand them over. For small datasets, you'll never notice. For anything real-world, it's a time bomb.

The fix is a single keyword. But to use it correctly, you need to understand what it actually does to your function.


1. The Basics: What yield Does to a Function

Every Python function you've written follows the same lifecycle:

Call β†’ Execute β†’ return value β†’ Stack frame is destroyed β†’ Done
Enter fullscreen mode Exit fullscreen mode

Local variables evaporate. State is gone. The function has no memory that it ever ran.

yield breaks this contract entirely.

The Normal Function: A Sprint

def countdown_list(n: int) -> list[int]:
    result = []
    while n > 0:
        result.append(n)
        n -= 1
    return result  # Hands you everything at once, then dies
Enter fullscreen mode Exit fullscreen mode

One call. One massive result. The function's stack frame is created, used, and destroyed.

The Generator: A Pause Button

def countdown(n: int):
    while n > 0:
        yield n    # Pause here, hand back n, wait to be resumed
        n -= 1
Enter fullscreen mode Exit fullscreen mode

The moment Python sees yield in a function body, the rules change. Calling countdown(5) no longer executes a single line of code. Instead, Python hands you back a generator objectβ€”a suspended, ready-to-run machine.

gen = countdown(5)
print(gen)          # <generator object countdown at 0x7f...>
print(next(gen))    # 5  β†’ Runs until yield, pauses, returns 5
print(next(gen))    # 4  β†’ Resumes, runs until yield, pauses, returns 4
print(next(gen))    # 3  β†’ Same
Enter fullscreen mode Exit fullscreen mode

What makes this possible? When a generator pauses at yield, its entire stack frameβ€”local variables, current line number, the value of nβ€”is moved from the stack to the heap. It doesn't disappear. It waits, frozen in time, until next() is called again.

Normal function:    [Stack Frame] β†’ return β†’ [Destroyed]

Generator:          [Stack Frame] β†’ yield β†’ [Moved to Heap, frozen]
                                          ↓
                    next() called  β†’ [Thawed, execution resumes]
                                          ↓
                                   yield β†’ [Frozen again]
Enter fullscreen mode Exit fullscreen mode

Old Way vs. New Way: Side by Side

Before generators, you had to implement the iterator protocol manuallyβ€”a verbose class with __iter__ and __next__:

# THE OLD WAY: Class-based iterator (20 lines of boilerplate)
class Countdown:
    def __init__(self, start: int) -> None:
        self.current = start

    def __iter__(self):
        return self

    def __next__(self) -> int:
        if self.current <= 0:
            raise StopIteration
        value = self.current
        self.current -= 1
        return value

# THE NEW WAY: Generator function (4 lines, zero boilerplate)
def countdown(n: int):
    while n > 0:
        yield n
        n -= 1
Enter fullscreen mode Exit fullscreen mode

Same behavior. Same memory efficiency. Same protocol compatibilityβ€”countdown(5) works anywhere Countdown(5) does.

The yield keyword is a class-based iterator, fully implemented, in one line.


2. Infinite Data Pipelines: The "Pull" Model

Here's where generators move from "interesting" to "indispensable."

Consider the difference:

import sys

# Eager: allocate the entire sequence in RAM
big_list = [x ** 2 for x in range(1_000_000)]
print(f"List size:      {sys.getsizeof(big_list):>12,} bytes")  
# List size:       8,448,728 bytes (~8 MB)

# Lazy: a tiny object that knows *how* to produce values
big_gen = (x ** 2 for x in range(1_000_000))
print(f"Generator size: {sys.getsizeof(big_gen):>12,} bytes")
# Generator size:         104 bytes (~104 bytes)
Enter fullscreen mode Exit fullscreen mode

Eight megabytes vs. 104 bytes. The generator doesn't store the squaresβ€”it stores the recipe for producing the next one. Scale this to 10GB of log files or a live market feed, and this difference is what separates a working system from a crashed one.

Generator Expressions: Lazy List Comprehensions

The (x for x in ...) syntax is a generator expressionβ€”the lazy sibling of list comprehensions.

# List comprehension: eager, executes immediately
squares_list = [x ** 2 for x in range(10)]    # All 10 computed NOW

# Generator expression: lazy, executes on demand
squares_gen  = (x ** 2 for x in range(10))    # NONE computed yet
Enter fullscreen mode Exit fullscreen mode

Square brackets β†’ list (eager). Parentheses β†’ generator (lazy).

The Pipeline Architecture

Generators compose naturally into pipelinesβ€”a chain of lazy transformations where data flows through only when pulled from the end:

  Source          Filter               Processor
    β”‚                β”‚                     β”‚
market_ticker() ──► (t for t              ──► trading logic
[infinite stream]    if t == 'AAPL')          (processes one
                    [lazy filter]              tick at a time)
Enter fullscreen mode Exit fullscreen mode
def market_ticker():
    """Simulates an infinite stream of market data."""
    import random, itertools
    symbols = ['AAPL', 'GOOG', 'MSFT', 'AMZN']
    for i in itertools.count():
        yield {
            'symbol': random.choice(symbols),
            'price': round(random.uniform(100, 300), 2),
            'volume': random.randint(100, 10000),
        }

# Build the pipeline β€” nothing executes yet
ticker    = market_ticker()                             # Source: infinite generator
aapl_only = (t for t in ticker if t['symbol'] == 'AAPL') # Filter: lazy expression

# Data flows ONLY when we pull from the end
tick = next(aapl_only)  # NOW it runs: pulls from ticker until it finds AAPL
print(tick)  # {'symbol': 'AAPL', 'price': 172.34, 'volume': 4821}
Enter fullscreen mode Exit fullscreen mode

Nothing ran when we built the pipeline. No data was fetched, no filtering occurred. The entire chain is dormant until we call next(). This is lazy evaluationβ€”the pipeline pulls data through only as fast as you consume it.

This is how you process a terabyte log file with 104 bytes of working memory.


3. The Deep Dive: Generators as Coroutines

Everything above treats generators as producers: you pull data out of them via next().

But generators can also be consumers: you push data into them via .send(). This transforms a generator from a simple stream into a stateful processing unitβ€”what computer scientists call a coroutine.

yield as an Expression

Normally, yield value is a statementβ€”it sends a value out. But it can also be an expression that receives a value:

def accumulator():
    total = 0
    while True:
        value = (yield total)  # Pause: send out total, wait to receive a value
        if value is not None:
            total += value

acc = accumulator()
next(acc)        # Prime the coroutine (advance to first yield)
acc.send(10)     # Push 10 in β†’ total becomes 10
acc.send(20)     # Push 20 in β†’ total becomes 30
result = acc.send(5)
print(result)    # 35
Enter fullscreen mode Exit fullscreen mode

The priming step (next(acc)) is required. A fresh generator is frozen at the start of the function, before any yield has been reached. You must advance it to the first yield before you can send anything to it.

The Three Generator Controls

Operation Syntax What it does
Pull next(gen) Resume, run until next yield, return yielded value
Push gen.send(val) Resume with val as the result of yield, run until next yield
Throw gen.throw(ExcType) Resume by raising an exception at the yield point
Close gen.close() Throw GeneratorExit, shut down cleanly

The .throw() method is particularly powerful. Instead of crashing your pipeline when bad data appears, you can inject the error directly at the coroutine's pause point and let it handle recovery internally.

Building a Finite State Machine with yield

A coroutine's "current line number" is its state. No state variables. No if state == "WATCHING" branching at the top. The control flow itself encodes the state.

from enum import Enum, auto

class BotState(Enum):
    WATCHING = auto()
    ACTIVE   = auto()

def trading_bot(entry_threshold: float = 150.0,
                exit_threshold:  float = 200.0) -> None:
    """
    A coroutine FSM with two states:
      WATCHING β†’ waiting for a low price to enter a position
      ACTIVE   β†’ holding a position, waiting to exit at profit
    """
    print("[BOT] Initialized. State: WATCHING")
    entry_price: float = 0.0

    while True:
        try:
            # ── STATE: WATCHING ──────────────────────────────
            while True:
                price: float = (yield)               # Wait for next tick
                print(f"[WATCHING] AAPL @ ${price:.2f}")
                if price <= entry_threshold:
                    entry_price = price
                    print(f"[SIGNAL] Entry at ${entry_price:.2f} β†’ switching to ACTIVE")
                    break                            # Transition to ACTIVE

            # ── STATE: ACTIVE ────────────────────────────────
            while True:
                price = (yield)                      # Wait for next tick
                pnl   = price - entry_price
                print(f"[ACTIVE]   AAPL @ ${price:.2f} | PnL: ${pnl:+.2f}")
                if price >= exit_threshold:
                    print(f"[SIGNAL] Exit at ${price:.2f} | Profit: ${pnl:.2f} β†’ switching to WATCHING")
                    break                            # Transition back to WATCHING

        except ValueError as e:
            # Bad tick injected via .throw() β€” reset to WATCHING without crashing
            print(f"[ERROR] Bad data received: {e}. Resetting to WATCHING.")
            entry_price = 0.0
            # Loop continues: back to WATCHING state
Enter fullscreen mode Exit fullscreen mode

4. The Grand Finale: The High-Frequency Trading Bot

Let's wire everything together. Four components. One elegant pipeline.

The Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    PIPELINE OVERVIEW                     β”‚
β”‚                                                          β”‚
β”‚  [Source]          [Filter]           [Sink]             β”‚
β”‚  market_ticker()──►aapl_stream ──────►trading_bot()      β”‚
β”‚  (Generator)       (Gen. Expression)  (Coroutine FSM)    β”‚
β”‚       β”‚                  β”‚                  β”‚            β”‚
β”‚   Produces all       Passes only        Consumes AAPL   β”‚
β”‚   symbols lazily     AAPL ticks         ticks, manages  β”‚
β”‚                                         state internally β”‚
β”‚                                                          β”‚
β”‚                   [Bridge]                               β”‚
β”‚                   for loop with                          β”‚
β”‚                   .send() / .throw()                     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

The Complete System

import random
import itertools
import sys

# ── COMPONENT 1: THE SOURCE ──────────────────────────────────────
def market_ticker():
    """Infinite stream of market ticks. Never terminates."""
    symbols = ['AAPL', 'GOOG', 'MSFT', 'AMZN']
    for _ in itertools.count():
        yield {
            'symbol': random.choice(symbols),
            'price':  round(random.uniform(100, 300), 2),
        }

# ── COMPONENT 2: THE FILTER ──────────────────────────────────────
def build_pipeline():
    ticker    = market_ticker()
    # Occasionally inject a bad tick to test error handling
    aapl_stream = (
        t for t in ticker
        if t['symbol'] == 'AAPL'
    )
    return aapl_stream

# ── COMPONENT 3: THE CONSUMER (FSM Coroutine) ────────────────────
# (trading_bot as defined in Section 3 above)

# ── COMPONENT 4: THE BRIDGE ──────────────────────────────────────
def run(tick_limit: int = 30) -> None:
    stream = build_pipeline()
    bot    = trading_bot(entry_threshold=150.0, exit_threshold=200.0)

    # Prime the coroutine β€” advance it to the first yield
    next(bot)

    ticks_processed = 0
    for tick in stream:
        if ticks_processed >= tick_limit:
            break

        price = tick['price']

        # Simulate occasional bad data (1-in-10 chance)
        if random.random() < 0.1:
            bad_price = -abs(price)  # Corrupt tick: negative price
            try:
                bot.throw(ValueError(f"Negative price: {bad_price}"))
                next(bot)            # Re-prime after error recovery
            except StopIteration:
                print("[BRIDGE] Bot shut down during error recovery.")
                break
        else:
            try:
                bot.send(price)      # Normal operation: push price to bot
            except StopIteration:
                print("[BRIDGE] Bot has shut down.")
                break

        ticks_processed += 1

    bot.close()  # Send GeneratorExit β€” clean shutdown
    print(f"\n[BRIDGE] Pipeline complete. Processed {ticks_processed} AAPL ticks.")

if __name__ == "__main__":
    run(tick_limit=30)
Enter fullscreen mode Exit fullscreen mode

Why .throw() is the Pro Pattern

Most tutorials show .send() and call it a day. But .throw() is what makes a coroutine-based FSM production-grade.

The alternative to .throw() is sentinel values:

# Naive approach: use magic values to signal errors
bot.send(-1)  # Hope the bot understands -1 means "bad data"
Enter fullscreen mode Exit fullscreen mode

This is fragile. It poisons your data channel with control signals. What if -1 is a legitimate (if unusual) price? What if you need to distinguish between different error types?

.throw() keeps the error channel and the data channel separate:

# The Bridge: clear separation of concerns
for tick in stream:
    if tick['price'] < 0:
        bot.throw(ValueError(f"Corrupt tick: {tick}"))  # Error channel
    else:
        bot.send(tick['price'])                          # Data channel
Enter fullscreen mode Exit fullscreen mode

The coroutine catches it in a try/except at its yield pointβ€”exactly like a normal function. The state machine resets cleanly. The pipeline keeps running. Zero sentinel values. Zero ambiguity.


Conclusion: Why This Matters

Let's close with three concrete reasons this mental model changes how you write code.

1. Memory Efficiency: O(1) Space for Infinite Streams

# This processes a 10GB log file in constant memory
def find_errors(path: str):
    with open(path) as f:
        yield from (line for line in f if "ERROR" in line)

for error_line in find_errors("application.log"):
    alert(error_line)
Enter fullscreen mode Exit fullscreen mode

No list. No .readlines(). A single line lives in memory at a time.

2. State Management: Your Line Number Is Your State

The trading_bot() coroutine has zero explicit state variables for its FSM transitions. The while True loop it's currently executing in is the state. Python's own call stack manages it.

Compare that to the class-based alternative:

# The non-generator version: manual state management
class TradingBot:
    def __init__(self):
        self.state = "WATCHING"    # Explicit state
        self.entry_price = 0.0

    def process(self, price: float) -> None:
        if self.state == "WATCHING":   # State checks everywhere
            ...
        elif self.state == "ACTIVE":
            ...
Enter fullscreen mode Exit fullscreen mode

More code. More surface area for bugs. More branching to read and maintain.

3. Composability: UNIX Pipes for Your Data

Each component in our pipeline does exactly one thing: the source generates ticks, the filter screens symbols, the bot manages trades. They're connected by conventionβ€”the iterator protocolβ€”not by inheritance or tight coupling.

You can swap any component without touching the others:

# Swap source: real broker API instead of random data
ticker = broker_api.stream()           # Same interface, different source

# Swap filter: multiple symbols
stream = (t for t in ticker if t['symbol'] in {'AAPL', 'MSFT'})

# Swap sink: logging bot instead of trading bot
bot = audit_logger(output="trades.log")  # Same .send() interface
Enter fullscreen mode Exit fullscreen mode

Small tools. Single responsibilities. Glued by protocol.

Top comments (0)