Swift's concurrency model introduced powerful tools for handling asynchronous sequences of values. Among these, AsyncStream
and AsyncThrowingStream
stand out as elegant solutions with Swift's modern async/await syntax.
What is AsyncStream?
AsyncStream
is a type that allows you to create an asynchronous sequence of values that can be consumed using for await
loops.
Think of AsyncStream
as a bridge between the old world of completion handlers and the new world of structured concurrency. It provides a way to emit values over time asynchronously, similar to how you might use a Publisher in Combine, but with a simpler, more Swift-native approach.
Buffering
Sometimes values are produced faster than they’re read.
AsyncStream automatically buffers them.
-By default, it stores as many as needed (Int.max).
-You can limit this by setting a buffer size.
What is AsyncThrowingStream?
AsyncThrowingStream
is similar to AsyncStream
but with one crucial difference: it can throw errors. This makes it perfect for scenarios where your asynchronous operations might fail and you need to propagate those errors to the consuming code.
Key Differences
- AsyncStream: For sequences that won't throw errors
- AsyncThrowingStream: For sequences that might throw errors during iteration
Example 1: Network Data Streaming with AsyncStream
Let's look at a practical example where we stream network data chunks:
import Foundation
class NetworkStreamer {
func streamData(from url: URL) -> AsyncStream<Data> {
AsyncStream { continuation in
// Handle cancellation properly
continuation.onTermination = { @Sendable _ in
// Cancellation cleanup will be handled by the task
}
Task {
do {
let (data, _) = try await URLSession.shared.data(from: url)
// Simulate streaming by sending data in chunks
let chunkSize = 1024
let chunks = stride(from: 0, to: data.count, by: chunkSize).map {
data.subdata(in: $0..<min($0 + chunkSize, data.count))
}
for chunk in chunks {
// Check for cancellation before yielding
if Task.isCancelled {
break
}
continuation.yield(chunk)
// Simulate delay between chunks
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
}
continuation.finish()
} catch {
print("Error occurred: \(error)")
continuation.finish()
}
}
}
}
}
// Usage
func consumeNetworkStream() async {
let streamer = NetworkStreamer()
let url = URL(string: "https://httpbin.org/json")!
for await dataChunk in streamer.streamData(from: url) {
print("Received chunk of size: \(dataChunk.count) bytes")
// Process each chunk as it arrives
}
print("Stream completed")
}
Example 2: File Processing with AsyncThrowingStream
Here's an example that demonstrates error handling while processing files:
import Foundation
enum FileProcessingError: Error {
case fileNotFound
case invalidFormat
case processingFailed(String)
}
class FileProcessor {
func processLargeFile(at path: String) -> AsyncThrowingStream<String, Error> {
AsyncThrowingStream { continuation in
// Handle cancellation
continuation.onTermination = { @Sendable _ in
// Cancellation cleanup handled by task cancellation
}
Task {
do {
// Check if file exists
guard FileManager.default.fileExists(atPath: path) else {
continuation.finish(throwing: FileProcessingError.fileNotFound)
return
}
let content = try String(contentsOfFile: path)
let lines = content.components(separatedBy: .newlines)
for (index, line) in lines.enumerated() {
// Check for cancellation
if Task.isCancelled {
break
}
// Simulate processing that might fail
if line.isEmpty && index > 0 {
// Skip empty lines but don't fail
continue
}
if line.contains("ERROR") {
// Simulate a processing error
continuation.finish(throwing: FileProcessingError.processingFailed("Found error marker at line \(index + 1)"))
return
}
// Process and yield the line
let processedLine = "Processed: \(line.uppercased())"
continuation.yield(processedLine)
// Simulate processing delay
try await Task.sleep(nanoseconds: 50_000_000) // 0.05 seconds
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
}
}
// Usage
func consumeFileStream() async {
let processor = FileProcessor()
let filePath = "/path/to/your/file.txt"
do {
for try await processedLine in processor.processLargeFile(at: filePath) {
print(" \(processedLine)")
}
print("File processing completed successfully")
} catch FileProcessingError.fileNotFound {
print(" File not found")
} catch FileProcessingError.processingFailed(let message) {
print("Processing failed: \(message)")
} catch {
print("Unexpected error: \(error)")
}
}
Key Concepts to Remember
Creating Streams
Both AsyncStream
and AsyncThrowingStream
are created using a closure that receives a continuation
parameter. This continuation is your interface for:
-
Yielding values: Use
continuation.yield(value)
to emit values -
Finishing successfully: Use
continuation.finish()
to signal completion -
Throwing errors: Use
continuation.finish(throwing: error)
for AsyncThrowingStream
Consuming Streams
- Use
for await
loops to consume values from the stream - For
AsyncThrowingStream
, wrap the loop in ado-catch
block to handle errors - The loop automatically terminates when the stream finishes
Cancellation Handling
Always implement proper cancellation handling using continuation.onTermination
to clean up resources when the stream is cancelled.
When to Use Each Type
Use AsyncStream when:
- Converting delegate callbacks to async/await
- Streaming data that won't fail
- Creating simple asynchronous sequences
Use AsyncThrowingStream when:
- Network operations that might fail
- File I/O operations
- Any scenario where errors need to be propagated through the stream
Conclusion
AsyncStream
and AsyncThrowingStream
are powerful tools that make it easy to work with asynchronous sequences in Swift. By understanding when and how to use each type, you can build more robust and efficient asynchronous applications.
Top comments (1)
AsyncStream conforms to AsyncSequence, providing a convenient way to create an asynchronous sequence without manually implementing an asynchronous iterator