DEV Community

seng
seng

Posted on

Asynchronous iterator in python

Asynchronous iterator is an object that, in an asynchronous environment, can yield data one by one and doesn't block the entire program while waiting for the next data. It's commonly used in the following situations:

  • Reading data from the network
  • Fetching results from database queries
  • Reading from a large file line by line
  • Processing real-time data streams

Asynchronous iterators have several characteristics:

  • Use async for to iterate through asynchronous iterators
  • The async keyword always precedes asynchronous iterator methods
  • While waiting for each data item, other tasks can continue running
  • It's suitable for I/O-intensive operations
import asyncio
import random

# Asynchronous Iterator Class
class AsyncDataStream:
    def __init__(self, data_list, delay_range=(0.1, 0.5)):
        self.data = data_list
        self.delay_range = delay_range

    def __aiter__(self):
        self.index = 0
        return self

    async def __anext__(self):
        if self.index >= len(self.data):
            raise StopAsyncIteration

        # Simulate asynchronous operations (like network requests, file reading, etc.)
        delay = random.uniform(*self.delay_range)
        await asyncio.sleep(delay)

        item = self.data[self.index]
        self.index += 1
        return item

# Consume asynchronous iterator using async for
async def process_data():
    data_stream = AsyncDataStream([
        "Data Item 1",
        "Data Item 2", 
        "Data Item 3",
        "Data Item 4",
        "Data Item 5"
    ])

    print("Starting data stream processing...")
    async for item in data_stream:
        print(f"Processing: {item}")

    print("All data processing completed!")

# Asynchronous generator example
async def async_generator_example():
    """Asynchronous generators are a more concise way to create asynchronous iterators"""
    for i in range(3):
        # Simulate asynchronous operation
        await asyncio.sleep(0.2)
        yield f"Generated Item {i + 1}"

async def use_async_generator():
    print("\nUsing asynchronous generator:")
    async for item in async_generator_example():
        print(f"Received: {item}")

# Asynchronous iteration with exception handling
async def process_with_error_handling():
    class FaultyStream:
        def __aiter__(self):
            self.count = 0
            return self

        async def __anext__(self):
            if self.count >= 5:
                raise StopAsyncIteration

            await asyncio.sleep(0.1)
            self.count += 1

            # Simulate occasional errors
            if self.count == 3:
                raise ValueError("Simulated data error")

            return f"Data {self.count}"

    stream = FaultyStream()
    try:
        async for item in stream:
            print(f"Success: {item}")
    except ValueError as e:
        print(f"Caught error: {e}")

# Practical example: Simulate fetching data from multiple API endpoints
async def fetch_from_api(endpoint, delay):
    """Simulate API request"""
    await asyncio.sleep(delay)
    return f"Data from {endpoint} (delay: {delay}s)"

async def api_data_collector():
    """Collect data from multiple API endpoints"""
    endpoints = [
        ("api/users", 0.3),
        ("api/products", 0.5),
        ("api/orders", 0.2)
    ]

    for endpoint, delay in endpoints:
        data = await fetch_from_api(endpoint, delay)
        yield data

async def main():
    # Example 1: Basic asynchronous iterator
    await process_data()

    # Example 2: Asynchronous generator
    await use_async_generator()

    # Example 3: Error handling
    print("\nAsynchronous iteration with error handling:")
    await process_with_error_handling()

    # Example 4: Practical application
    print("\nCollecting API data:")
    async for api_data in api_data_collector():
        print(f"Received: {api_data}")

# Run all examples
if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Top comments (0)