DEV Community

Cover image for The Async Iterator Part 2: Streaming Data and Real-World Patterns
Aaron Rose
Aaron Rose

Posted on

The Async Iterator Part 2: Streaming Data and Real-World Patterns

The next morning, Timothy arrived at the library early, eager to apply what he'd learned about async iteration. He'd spent the evening refactoring his log analyzer, and now he wanted to tackle something more ambitious: building a real-time dashboard that streamed library statistics.

"Margaret, I've got async iteration working," Timothy said, pulling up his screen. "But now I'm trying to do something practical, and I keep running into problems."

Margaret looked over with interest. "Show me what you're working on."

Timothy pulled up his code:

import asyncio

async def get_checkouts():
    """Get recent checkouts"""
    checkouts = []
    async for record in read_checkout_log():
        checkouts.append(record)
    return checkouts

async def get_returns():
    """Get recent returns"""
    returns = []
    async for record in read_return_log():
        returns.append(record)
    return returns

async def get_active_users():
    """Get active users"""
    users = []
    async for user in read_active_sessions():
        users.append(user)
    return users
Enter fullscreen mode Exit fullscreen mode

"This works," Timothy said, "but it feels clunky. I'm writing the same pattern over and over—create a list, iterate, append. There must be a better way."

Margaret smiled. "There is. Let me introduce you to async comprehensions."

Async Comprehensions: Syntactic Sugar for Async Iteration

"Remember list comprehensions?" Margaret asked, typing:

# Regular list comprehension
numbers = [1, 2, 3, 4, 5]
squares = [n ** 2 for n in numbers]
print(squares)  # [1, 4, 9, 16, 25]
Enter fullscreen mode Exit fullscreen mode

"Of course," Timothy said. "They're more elegant than building lists with loops."

"Async comprehensions are the same concept, but for async iterators," Margaret explained. She refactored his code:

import asyncio

async def read_checkout_log():
    """Simulate reading checkout records"""
    for i in range(5):
        await asyncio.sleep(0.1)
        yield {"id": i, "book": f"Book_{i}", "user": f"User_{i}"}

async def read_return_log():
    """Simulate reading return records"""
    for i in range(3):
        await asyncio.sleep(0.1)
        yield {"id": i, "book": f"Book_{i}"}

# OLD WAY: Manual iteration
async def get_checkouts_old():
    checkouts = []
    async for record in read_checkout_log():
        checkouts.append(record)
    return checkouts

# NEW WAY: Async list comprehension
async def get_checkouts_new():
    return [record async for record in read_checkout_log()]

async def demo():
    print("Old way:")
    result1 = await get_checkouts_old()
    print(f"  Got {len(result1)} checkouts")

    print("\nNew way:")
    result2 = await get_checkouts_new()
    print(f"  Got {len(result2)} checkouts")

asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Old way:
  Got 5 checkouts

New way:
  Got 5 checkouts
Enter fullscreen mode Exit fullscreen mode

"Just add async before the for!" Timothy said. "That's much cleaner."

"Right. The syntax is [expr async for item in async_iterator]," Margaret said. "You can also use filtering:"

import asyncio

async def read_checkout_log():
    """Generate checkout records"""
    records = [
        {"id": 1, "book": "Python Guide", "user": "Alice", "overdue": False},
        {"id": 2, "book": "Async Patterns", "user": "Bob", "overdue": True},
        {"id": 3, "book": "Data Structures", "user": "Carol", "overdue": False},
        {"id": 4, "book": "Algorithms", "user": "Dave", "overdue": True},
    ]
    for record in records:
        await asyncio.sleep(0.1)
        yield record

async def get_overdue_books():
    """Get only overdue books using filtering"""
    return [
        record['book'] 
        async for record in read_checkout_log() 
        if record['overdue']
    ]

async def demo():
    overdue = await get_overdue_books()
    print(f"Overdue books: {overdue}")

asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Overdue books: ['Async Patterns', 'Algorithms']
Enter fullscreen mode Exit fullscreen mode

"You can filter right in the comprehension!" Timothy said.

All Three Comprehension Types

Margaret showed him the complete picture:

import asyncio

async def generate_records():
    """Generate sample records"""
    for i in range(5):
        await asyncio.sleep(0.05)
        yield {"id": i, "value": i * 10, "category": "even" if i % 2 == 0 else "odd"}

async def demo_all_comprehensions():
    # Async LIST comprehension
    values_list = [r['value'] async for r in generate_records()]
    print(f"List: {values_list}")

    # Async SET comprehension
    categories_set = {r['category'] async for r in generate_records()}
    print(f"Set: {categories_set}")

    # Async DICT comprehension
    id_to_value = {r['id']: r['value'] async for r in generate_records()}
    print(f"Dict: {id_to_value}")

asyncio.run(demo_all_comprehensions())
Enter fullscreen mode Exit fullscreen mode

Output:

List: [0, 10, 20, 30, 40]
Set: {'even', 'odd'}
Dict: {0: 0, 1: 10, 2: 20, 3: 30, 4: 40}
Enter fullscreen mode Exit fullscreen mode

"Just like regular comprehensions," Margaret explained, "you have list, set, and dict versions. The syntax is always async for inside the comprehension."

She wrote out the patterns:

Async Comprehension Syntax:

List:  [expr async for item in async_iter]
Set:   {expr async for item in async_iter}
Dict:  {key: val async for item in async_iter}

With filtering:
[expr async for item in async_iter if condition]

Regular comprehensions:  for item in iter
Async comprehensions:    async for item in async_iter
                         ^^^^^ just add async!
Enter fullscreen mode Exit fullscreen mode

When NOT to Use Async Comprehensions

"One warning," Margaret said. "Async comprehensions build the entire result in memory. If you're processing a huge stream of data, this can be a problem."

She demonstrated:

import asyncio

async def huge_dataset():
    """Simulate a massive dataset"""
    for i in range(1_000_000):
        await asyncio.sleep(0)  # Yield control
        yield i

# BAD: Builds entire list in memory
async def process_all_at_once():
    print("Loading everything into memory...")
    # This creates a list with 1 million items!
    all_data = [x async for x in huge_dataset()]
    print(f"Loaded {len(all_data)} items")
    return sum(all_data)

# GOOD: Process one item at a time
async def process_streaming():
    print("Processing as a stream...")
    total = 0
    count = 0
    async for x in huge_dataset():
        total += x
        count += 1
    print(f"Processed {count} items")
    return total

# Both get the same result, but streaming uses constant memory
Enter fullscreen mode Exit fullscreen mode

"The comprehension forces you to wait for all data before processing," Margaret explained. "Sometimes you want that—when you need the complete dataset. But for large streams, iterate directly with async for."

Timothy nodded. "So comprehensions are for collecting finite datasets, but direct iteration is for processing streams."

"Exactly. Think about it like regular Python: you wouldn't load a 10GB file into a list comprehension. Same principle applies here."

Real Async File I/O with aiofiles

"Let's move to something more practical," Margaret said. "In Part 1, I mentioned that Python's built-in open() blocks the event loop. Let me show you the real solution."

She opened a terminal and typed:

pip install aiofiles
Enter fullscreen mode Exit fullscreen mode

Then she wrote:

import asyncio
import aiofiles

async def read_log_async():
    """Demonstrates async file I/O"""
    print("Setting up demo file...")

    # Create a sample file (for demo purposes)
    async with aiofiles.open("sample.log", "w") as f:
        await f.write("Line 1\nLine 2\nLine 3\n")

    print("Reading with async I/O...")

    # Read asynchronously - doesn't block event loop!
    async with aiofiles.open("sample.log", "r") as f:
        async for line in f:
            print(f"  {line.strip()}")

async def other_task():
    """Runs concurrently"""
    for i in range(3):
        print(f"[Background task {i}]")
        await asyncio.sleep(0.1)

async def demo():
    # Run file reading alongside other work
    await asyncio.gather(
        read_log_async(),
        other_task()
    )

asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Setting up demo file...
Reading with async I/O...
[Background task 0]
  Line 1
  Line 2
[Background task 1]
  Line 3
[Background task 2]
Enter fullscreen mode Exit fullscreen mode

"See how the background task runs during file reading?" Margaret pointed out. "With regular open(), the entire event loop would block on the file I/O, and the background task couldn't run until reading was complete."

"The key differences," she continued, typing out a comparison:

Regular File I/O vs Async File I/O:

Regular (blocks event loop):
    with open(path, 'r') as f:
        for line in f:
            process(line)

Async (cooperates with event loop):
    async with aiofiles.open(path, 'r') as f:
        async for line in f:
            await process(line)

Differences:
1. async with instead of with
2. aiofiles.open() instead of open()
3. async for instead of for
4. Can await inside the loop
Enter fullscreen mode Exit fullscreen mode

Async Context Managers + Async Iteration

Timothy studied the code. "I see async with and async for together. How does that work?"

"Great observation," Margaret said. "Let me explain async context managers first, then show how they combine with async iteration."

She typed:

import asyncio

class DatabaseConnection:
    """Simulated async database connection"""

    async def __aenter__(self):
        """Called when entering 'async with' block"""
        print("  Opening database connection...")
        await asyncio.sleep(0.1)  # Simulate connection time
        self.connected = True
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Called when exiting 'async with' block"""
        print("  Closing database connection...")
        await asyncio.sleep(0.1)  # Simulate cleanup
        self.connected = False

    async def query(self, sql):
        """Execute a query"""
        if not self.connected:
            raise RuntimeError("Not connected!")
        await asyncio.sleep(0.05)
        # Simulate returning results
        return [f"Result {i}" for i in range(3)]

async def demo():
    # Async context manager ensures cleanup
    async with DatabaseConnection() as db:
        print("Inside async with block")
        results = await db.query("SELECT * FROM books")
        print(f"Got {len(results)} results")
    print("Outside async with block - connection closed")

asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  Opening database connection...
Inside async with block
Got 3 results
  Closing database connection...
Outside async with block - connection closed
Enter fullscreen mode Exit fullscreen mode

"An async context manager uses async with and must define __aenter__ and __aexit__ as coroutines," Margaret explained. "It's like a regular context manager, but async."

She drew a diagram:

Regular Context Manager:
    with resource():
        use()
    # Cleanup happens here

Async Context Manager:
    async with resource():
        await use()
    # Async cleanup happens here

Both guarantee cleanup, but async version can await during setup/teardown
Enter fullscreen mode Exit fullscreen mode

Combining Async Context Managers with Async Iteration

"Now here's where it gets powerful," Margaret said. "You can combine async with and async for for safe streaming."

import asyncio

class DatabaseCursor:
    """Async database cursor with iteration"""

    def __init__(self):
        self.position = 0
        self.closed = False

    async def __aenter__(self):
        """Set up cursor"""
        print("  Opening cursor...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Clean up cursor"""
        print("  Closing cursor...")
        self.closed = True
        await asyncio.sleep(0.1)

    def __aiter__(self):
        """Return self as async iterator"""
        return self

    async def __anext__(self):
        """Fetch next row"""
        if self.closed:
            raise RuntimeError("Cursor is closed!")

        if self.position >= 5:
            raise StopAsyncIteration

        await asyncio.sleep(0.05)  # Simulate fetch time
        row = {"id": self.position, "title": f"Book {self.position}"}
        self.position += 1
        return row

async def fetch_books():
    """Fetch books using cursor"""
    # Both context manager AND iterator!
    async with DatabaseCursor() as cursor:
        print("Fetching books:")
        async for book in cursor:
            print(f"  {book['title']}")
    print("Cursor automatically closed")

asyncio.run(fetch_books())
Enter fullscreen mode Exit fullscreen mode

Output:

  Opening cursor...
Fetching books:
  Book 0
  Book 1
  Book 2
  Book 3
  Book 4
  Closing cursor...
Cursor automatically closed
Enter fullscreen mode Exit fullscreen mode

"This pattern is incredibly common with databases," Margaret said. "The cursor is both a context manager (for resource cleanup) and an async iterator (for fetching rows)."

Timothy was impressed. "So async with ensures the cursor gets closed, and async for streams the results?"

"Exactly. Even if an error occurs during iteration, __aexit__ will be called to clean up the cursor."

Error Handling in Async Iteration

"Speaking of errors," Timothy said, "what happens if an async iterator fails partway through?"

"Excellent question," Margaret said. "Let me show you."

import asyncio

async def flaky_data_source():
    """Iterator that fails after a few items"""
    for i in range(5):
        await asyncio.sleep(0.1)

        if i == 3:
            raise ValueError(f"Database error at record {i}")

        yield {"id": i, "data": f"Record {i}"}

async def process_with_error_handling():
    """Handle errors during iteration"""
    print("Processing with error handling:")
    processed = 0

    try:
        async for record in flaky_data_source():
            print(f"  Processing: {record['data']}")
            processed += 1
    except ValueError as e:
        print(f"  ERROR: {e}")
        print(f"  Successfully processed {processed} records before error")

    print(f"Finished (processed {processed} records)")

asyncio.run(process_with_error_handling())
Enter fullscreen mode Exit fullscreen mode

Output:

Processing with error handling:
  Processing: Record 0
  Processing: Record 1
  Processing: Record 2
  ERROR: Database error at record 3
  Successfully processed 3 records before error
Finished (processed 3 records)
Enter fullscreen mode Exit fullscreen mode

"The async for loop stops when an exception is raised," Margaret explained. "You handle it the same way as regular iteration—with try/except."

She showed a more robust pattern:

import asyncio

async def retry_data_source():
    """Iterator with retry logic"""
    for i in range(5):
        await asyncio.sleep(0.1)
        yield {"id": i, "data": f"Record {i}"}

async def process_with_recovery():
    """Process with automatic recovery"""
    print("Processing with recovery:")
    max_retries = 3
    retry_count = 0
    processed = 0

    while retry_count < max_retries:
        try:
            async for record in retry_data_source():
                print(f"  Processing: {record['data']}")
                processed += 1
            # Success - break out of retry loop
            break
        except Exception as e:
            retry_count += 1
            print(f"  ERROR: {e}")
            if retry_count < max_retries:
                print(f"  Retrying... (attempt {retry_count + 1}/{max_retries})")
                await asyncio.sleep(1)  # Backoff
            else:
                print(f"  Max retries reached. Processed {processed} records.")
                raise

asyncio.run(process_with_recovery())
Enter fullscreen mode Exit fullscreen mode

"This pattern is common for network requests or database queries that might fail transiently," Margaret said.

Cleanup on Error

"One critical point," Margaret added. "When an exception occurs in an async for loop, Python automatically calls aclose() on the generator."

import asyncio

async def generator_with_cleanup():
    """Generator that needs cleanup"""
    print("  [Generator] Starting")
    try:
        for i in range(5):
            await asyncio.sleep(0.1)

            if i == 2:
                raise ValueError("Simulated error")

            yield i
    finally:
        print("  [Generator] Cleanup running (finally block)")

async def demo_cleanup_on_error():
    print("Demo: Cleanup on error")
    try:
        async for value in generator_with_cleanup():
            print(f"  Got value: {value}")
    except ValueError as e:
        print(f"  Caught error: {e}")

    print("Finished")

asyncio.run(demo_cleanup_on_error())
Enter fullscreen mode Exit fullscreen mode

Output:

Demo: Cleanup on error
  [Generator] Starting
  Got value: 0
  Got value: 1
  [Generator] Cleanup running (finally block)
  Caught error: Simulated error
Finished
Enter fullscreen mode Exit fullscreen mode

"See how the finally block runs even though we raised an error?" Margaret pointed out. "Python ensures cleanup happens. This is why finally blocks in generators are perfect for closing files, releasing locks, or cleaning up resources."

"One important detail," Margaret added. "If your cleanup code needs to await anything—like closing a network connection or flushing a buffer—your generator must be an async def. The finally block can contain await statements because the whole generator is a coroutine."

She showed an example:

import asyncio

async def acquire_async_resource():
    """Simulated async resource acquisition"""
    await asyncio.sleep(0.1)
    return {"name": "DatabaseConnection", "close": lambda: asyncio.sleep(0.1)}

async def generator_with_async_cleanup():
    """Cleanup that requires await"""
    print("  Acquiring resource")
    resource = await acquire_async_resource()

    try:
        yield "data"
    finally:
        # This await works because we're in async def
        print("  Closing resource...")
        await asyncio.sleep(0.1)
        print("  Resource closed")

async def demo():
    async for item in generator_with_async_cleanup():
        print(f"  Got: {item}")

asyncio.run(demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  Acquiring resource
  Got: data
  Closing resource...
  Resource closed
Enter fullscreen mode Exit fullscreen mode

"If this were a regular generator (def instead of async def), you couldn't use await in the finally block," Margaret explained. "The async def is what allows the cleanup to be asynchronous."

Streaming API Pagination

"Let's tackle a real-world problem," Margaret said. "Imagine our library has an API that returns paginated results. You want to iterate over all books, but the API only gives you 10 at a time."

import asyncio

async def fetch_page(page_number, page_size=10):
    """Simulate fetching a page from an API"""
    await asyncio.sleep(0.2)  # Simulate network delay

    # Simulate total of 35 books
    total_books = 35
    start = page_number * page_size
    end = min(start + page_size, total_books)

    if start >= total_books:
        return []

    return [
        {"id": i, "title": f"Book {i}"}
        for i in range(start, end)
    ]

async def paginated_books():
    """Async generator that handles pagination"""
    page = 0
    while True:
        books = await fetch_page(page)

        if not books:
            break  # No more pages

        for book in books:
            yield book

        page += 1

async def demo_pagination():
    print("Fetching all books via pagination:")
    count = 0

    async for book in paginated_books():
        count += 1
        print(f"  {book['title']}")

    print(f"\nTotal books fetched: {count}")

asyncio.run(demo_pagination())
Enter fullscreen mode Exit fullscreen mode

Output:

Fetching all books via pagination:
  Book 0
  Book 1
  Book 2
  ...
  Book 33
  Book 34

Total books fetched: 35
Enter fullscreen mode Exit fullscreen mode

"The consumer doesn't know about pagination," Margaret explained. "They just use async for and get all the books. The generator handles the complexity of fetching multiple pages."

Timothy grinned. "That's way cleaner than managing page numbers in the calling code."

"Right. This is a common pattern for REST APIs, database queries with cursors, or any paginated data source."

Buffering and Prefetching

"One more advanced pattern," Margaret said. "Sometimes you want to fetch data ahead of time to minimize waiting."

import asyncio
from collections import deque

async def slow_data_source():
    """Simulates a slow data source"""
    for i in range(10):
        await asyncio.sleep(0.5)  # Slow!
        yield i

async def buffered_iterator(source, buffer_size=3):
    """Pre-fetch items into a buffer"""
    buffer = deque()
    iterator = source.__aiter__()

    # Pre-fill buffer
    print(f"Pre-filling buffer with {buffer_size} items...")
    for _ in range(buffer_size):
        try:
            item = await iterator.__anext__()
            buffer.append(item)
        except StopAsyncIteration:
            break

    # Yield from buffer and refill
    while buffer:
        yield buffer.popleft()

        # Try to refill
        try:
            item = await iterator.__anext__()
            buffer.append(item)
        except StopAsyncIteration:
            pass

async def demo_buffering():
    print("Without buffering (slow):")
    start = asyncio.get_event_loop().time()
    count = 0
    async for item in slow_data_source():
        count += 1
        if count >= 3:
            break
    elapsed = asyncio.get_event_loop().time() - start
    print(f"  Got 3 items in {elapsed:.1f}s\n")

    print("With buffering (faster):")
    start = asyncio.get_event_loop().time()
    count = 0
    async for item in buffered_iterator(slow_data_source(), buffer_size=3):
        count += 1
        if count >= 3:
            break
    elapsed = asyncio.get_event_loop().time() - start
    print(f"  Got 3 items in {elapsed:.1f}s")

asyncio.run(demo_buffering())
Enter fullscreen mode Exit fullscreen mode

"Buffering can significantly improve throughput when processing slow streams," Margaret explained. "You're essentially pipelining the fetching and processing."

"This demonstrates the buffering concept," Margaret added. "In production, you might use libraries like aiostream that provide pre-built buffering utilities. But understanding how to build it yourself helps you know what's happening under the hood."

Testing Async Iterators

As they wrapped up, Timothy asked, "How do I test these things? My usual testing approaches don't work with async code."

"Good question," Margaret said. "Let me show you pytest with async support."

# test_async_iterators.py
import pytest
import asyncio

async def sample_generator():
    """Generator to test"""
    for i in range(3):
        await asyncio.sleep(0.01)
        yield i * 2

@pytest.mark.asyncio
async def test_async_generator():
    """Test async generator output"""
    results = []
    async for value in sample_generator():
        results.append(value)

    assert results == [0, 2, 4]

@pytest.mark.asyncio
async def test_async_comprehension():
    """Test async comprehension"""
    results = [v async for v in sample_generator()]
    assert results == [0, 2, 4]

@pytest.mark.asyncio
async def test_error_handling():
    """Test error handling in async iteration"""
    async def failing_generator():
        yield 1
        raise ValueError("Test error")

    with pytest.raises(ValueError):
        async for _ in failing_generator():
            pass
Enter fullscreen mode Exit fullscreen mode

"Install pytest-asyncio and mark your tests with @pytest.mark.asyncio," Margaret said. "Then write tests like normal async functions."

The Pattern Library

Margaret summarized the patterns they'd covered on a whiteboard:

Async Iterator Patterns:

1. Async Comprehensions
   [x async for x in source() if condition]
   - Clean syntax for collecting results
   - Loads everything into memory
   - Use for finite datasets

2. Direct Iteration
   async for x in source():
       await process(x)
   - Constant memory usage
   - Use for large streams
   - Process one item at a time

3. Context Manager + Iterator
   async with resource() as r:
       async for item in r:
           await process(item)
   - Guaranteed cleanup
   - Common with databases, files, connections

4. Pagination Pattern
   async def paginated():
       page = 0
       while True:
           batch = await fetch(page)
           if not batch: break
           for item in batch:
               yield item
           page += 1
   - Hides pagination complexity
   - Consumer sees flat stream

5. Buffering/Prefetching
   - Pre-fetch items to minimize waiting
   - Improves throughput for slow sources
   - More complex but can be worth it

6. Error Handling
   try:
       async for item in source():
           await process(item)
   except Exception:
       # Handle or retry
   - Same as regular iteration
   - Cleanup happens automatically
Enter fullscreen mode Exit fullscreen mode

The Takeaway

Timothy closed his laptop, now equipped with practical patterns for real-world async iteration.

Async comprehensions provide clean syntax: Just add async before for in list/set/dict comprehensions.

Comprehensions load everything into memory: Use direct async for for large streams instead.

aiofiles provides real async file I/O: Python's built-in open() blocks the event loop.

async with + async for is a powerful combination: Context managers ensure cleanup, iterators stream data.

Async context managers use aenter and aexit: Both must be coroutines.

Error handling works the same as regular iteration: Use try/except around async for loops.

Python calls aclose() automatically on errors: Generator cleanup happens even when iteration fails.

finally blocks can contain await in async generators: Because the entire generator is a coroutine.

Pagination pattern hides API complexity: Consumers see a flat stream, generator handles pages.

Buffering can improve throughput: Pre-fetch items to minimize waiting between items.

Libraries like aiostream provide pre-built patterns: But building your own helps you understand the mechanics.

pytest-asyncio enables async test cases: Mark tests with @pytest.mark.asyncio.

finally blocks ensure cleanup: Use them for releasing resources in generators.

Choose the right pattern for your use case: Comprehensions for collecting, direct iteration for streaming, buffering for performance.


Margaret and Timothy had covered the essential patterns for production async iteration. Timothy's dashboard was now streaming library statistics efficiently, his file processing no longer blocked the event loop, and he understood how to handle errors gracefully in async code.

The library was quiet in the afternoon. As Timothy packed up, he realized that async iteration wasn't just about syntax—it was about choosing the right pattern for each situation, understanding when to collect versus stream, and ensuring resources were properly managed even when things went wrong.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Top comments (0)