Asynchronous iterator is an object that, in an asynchronous environment, can yield data one by one and doesn't block the entire program while waiting for the next data. It's commonly used in the following situations:
- Reading data from the network
- Fetching results from database queries
- Reading from a large file line by line
- Processing real-time data streams
Asynchronous iterators have several characteristics:
- Use async for to iterate through asynchronous iterators
- The async keyword always precedes asynchronous iterator methods
- While waiting for each data item, other tasks can continue running
- It's suitable for I/O-intensive operations
import asyncio
import random
# Asynchronous Iterator Class
class AsyncDataStream:
def __init__(self, data_list, delay_range=(0.1, 0.5)):
self.data = data_list
self.delay_range = delay_range
def __aiter__(self):
self.index = 0
return self
async def __anext__(self):
if self.index >= len(self.data):
raise StopAsyncIteration
# Simulate asynchronous operations (like network requests, file reading, etc.)
delay = random.uniform(*self.delay_range)
await asyncio.sleep(delay)
item = self.data[self.index]
self.index += 1
return item
# Consume asynchronous iterator using async for
async def process_data():
data_stream = AsyncDataStream([
"Data Item 1",
"Data Item 2",
"Data Item 3",
"Data Item 4",
"Data Item 5"
])
print("Starting data stream processing...")
async for item in data_stream:
print(f"Processing: {item}")
print("All data processing completed!")
# Asynchronous generator example
async def async_generator_example():
"""Asynchronous generators are a more concise way to create asynchronous iterators"""
for i in range(3):
# Simulate asynchronous operation
await asyncio.sleep(0.2)
yield f"Generated Item {i + 1}"
async def use_async_generator():
print("\nUsing asynchronous generator:")
async for item in async_generator_example():
print(f"Received: {item}")
# Asynchronous iteration with exception handling
async def process_with_error_handling():
class FaultyStream:
def __aiter__(self):
self.count = 0
return self
async def __anext__(self):
if self.count >= 5:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count += 1
# Simulate occasional errors
if self.count == 3:
raise ValueError("Simulated data error")
return f"Data {self.count}"
stream = FaultyStream()
try:
async for item in stream:
print(f"Success: {item}")
except ValueError as e:
print(f"Caught error: {e}")
# Practical example: Simulate fetching data from multiple API endpoints
async def fetch_from_api(endpoint, delay):
"""Simulate API request"""
await asyncio.sleep(delay)
return f"Data from {endpoint} (delay: {delay}s)"
async def api_data_collector():
"""Collect data from multiple API endpoints"""
endpoints = [
("api/users", 0.3),
("api/products", 0.5),
("api/orders", 0.2)
]
for endpoint, delay in endpoints:
data = await fetch_from_api(endpoint, delay)
yield data
async def main():
# Example 1: Basic asynchronous iterator
await process_data()
# Example 2: Asynchronous generator
await use_async_generator()
# Example 3: Error handling
print("\nAsynchronous iteration with error handling:")
await process_with_error_handling()
# Example 4: Practical application
print("\nCollecting API data:")
async for api_data in api_data_collector():
print(f"Received: {api_data}")
# Run all examples
if __name__ == "__main__":
asyncio.run(main())
Top comments (0)