DEV Community

ArshTechPro
ArshTechPro

Posted on

Understanding AsyncStream and AsyncThrowingStream in Swift

Swift's concurrency model introduced powerful tools for handling asynchronous sequences of values. Among these, AsyncStream and AsyncThrowingStream stand out as elegant solutions with Swift's modern async/await syntax.

What is AsyncStream?

AsyncStream is a type that allows you to create an asynchronous sequence of values that can be consumed using for await loops.

Think of AsyncStream as a bridge between the old world of completion handlers and the new world of structured concurrency. It provides a way to emit values over time asynchronously, similar to how you might use a Publisher in Combine, but with a simpler, more Swift-native approach.

Buffering

Sometimes values are produced faster than they’re read.
AsyncStream automatically buffers them.

-By default, it stores as many as needed (Int.max).

-You can limit this by setting a buffer size.

What is AsyncThrowingStream?

AsyncThrowingStream is similar to AsyncStream but with one crucial difference: it can throw errors. This makes it perfect for scenarios where your asynchronous operations might fail and you need to propagate those errors to the consuming code.

Key Differences

  • AsyncStream: For sequences that won't throw errors
  • AsyncThrowingStream: For sequences that might throw errors during iteration

Example 1: Network Data Streaming with AsyncStream

Let's look at a practical example where we stream network data chunks:

import Foundation

class NetworkStreamer {
    func streamData(from url: URL) -> AsyncStream<Data> {
        AsyncStream { continuation in
            // Handle cancellation properly
            continuation.onTermination = { @Sendable _ in
                // Cancellation cleanup will be handled by the task
            }

            Task {
                do {
                    let (data, _) = try await URLSession.shared.data(from: url)

                    // Simulate streaming by sending data in chunks
                    let chunkSize = 1024
                    let chunks = stride(from: 0, to: data.count, by: chunkSize).map {
                        data.subdata(in: $0..<min($0 + chunkSize, data.count))
                    }

                    for chunk in chunks {
                        // Check for cancellation before yielding
                        if Task.isCancelled {
                            break
                        }
                        continuation.yield(chunk)
                        // Simulate delay between chunks
                        try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds
                    }

                    continuation.finish()
                } catch {
                    print("Error occurred: \(error)")
                    continuation.finish()
                }
            }
        }
    }
}

// Usage
func consumeNetworkStream() async {
    let streamer = NetworkStreamer()
    let url = URL(string: "https://httpbin.org/json")!

    for await dataChunk in streamer.streamData(from: url) {
        print("Received chunk of size: \(dataChunk.count) bytes")
        // Process each chunk as it arrives
    }

    print("Stream completed")
}
Enter fullscreen mode Exit fullscreen mode

Example 2: File Processing with AsyncThrowingStream

Here's an example that demonstrates error handling while processing files:

import Foundation

enum FileProcessingError: Error {
    case fileNotFound
    case invalidFormat
    case processingFailed(String)
}

class FileProcessor {
    func processLargeFile(at path: String) -> AsyncThrowingStream<String, Error> {
        AsyncThrowingStream { continuation in
            // Handle cancellation
            continuation.onTermination = { @Sendable _ in
                // Cancellation cleanup handled by task cancellation
            }

            Task {
                do {
                    // Check if file exists
                    guard FileManager.default.fileExists(atPath: path) else {
                        continuation.finish(throwing: FileProcessingError.fileNotFound)
                        return
                    }

                    let content = try String(contentsOfFile: path)
                    let lines = content.components(separatedBy: .newlines)

                    for (index, line) in lines.enumerated() {
                        // Check for cancellation
                        if Task.isCancelled {
                            break
                        }

                        // Simulate processing that might fail
                        if line.isEmpty && index > 0 {
                            // Skip empty lines but don't fail
                            continue
                        }

                        if line.contains("ERROR") {
                            // Simulate a processing error
                            continuation.finish(throwing: FileProcessingError.processingFailed("Found error marker at line \(index + 1)"))
                            return
                        }

                        // Process and yield the line
                        let processedLine = "Processed: \(line.uppercased())"
                        continuation.yield(processedLine)

                        // Simulate processing delay
                        try await Task.sleep(nanoseconds: 50_000_000) // 0.05 seconds
                    }

                    continuation.finish()

                } catch {
                    continuation.finish(throwing: error)
                }
            }
        }
    }
}

// Usage
func consumeFileStream() async {
    let processor = FileProcessor()
    let filePath = "/path/to/your/file.txt"

    do {
        for try await processedLine in processor.processLargeFile(at: filePath) {
            print(" \(processedLine)")
        }
        print("File processing completed successfully")
    } catch FileProcessingError.fileNotFound {
        print(" File not found")
    } catch FileProcessingError.processingFailed(let message) {
        print("Processing failed: \(message)")
    } catch {
        print("Unexpected error: \(error)")
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Concepts to Remember

Creating Streams

Both AsyncStream and AsyncThrowingStream are created using a closure that receives a continuation parameter. This continuation is your interface for:

  • Yielding values: Use continuation.yield(value) to emit values
  • Finishing successfully: Use continuation.finish() to signal completion
  • Throwing errors: Use continuation.finish(throwing: error) for AsyncThrowingStream

Consuming Streams

  • Use for await loops to consume values from the stream
  • For AsyncThrowingStream, wrap the loop in a do-catch block to handle errors
  • The loop automatically terminates when the stream finishes

Cancellation Handling

Always implement proper cancellation handling using continuation.onTermination to clean up resources when the stream is cancelled.

When to Use Each Type

Use AsyncStream when:

  • Converting delegate callbacks to async/await
  • Streaming data that won't fail
  • Creating simple asynchronous sequences

Use AsyncThrowingStream when:

  • Network operations that might fail
  • File I/O operations
  • Any scenario where errors need to be propagated through the stream

Conclusion

AsyncStream and AsyncThrowingStream are powerful tools that make it easy to work with asynchronous sequences in Swift. By understanding when and how to use each type, you can build more robust and efficient asynchronous applications.

Top comments (1)

Collapse
 
arshtechpro profile image
ArshTechPro

AsyncStream conforms to AsyncSequence, providing a convenient way to create an asynchronous sequence without manually implementing an asynchronous iterator