In the previous two lessons, we learned about Future and core asynchronous programming techniques, which are mainly used for handling single asynchronous results (like a single network request or a single file read/write). But in real-world development, we often need to handle continuous streams of asynchronous events – such as real-time chat messages, sensor data, or file download progress. Today we'll learn about Stream, Dart's core tool for working with these continuous data flows.
I. Fundamental Differences Between Stream and Future: Single Value vs Multiple Values
Before diving into Stream, let's clarify its core differences from Future:
Characteristic | Future | Stream |
---|---|---|
Number of results | Single result (or error) | Multiple consecutive results (or errors) |
Lifecycle | Pending → Completed (success/failure) | Pending → Emitting data → Completed/Errored |
Typical scenarios | Single network request, single file operation | Real-time data, event listening, progress updates |
In real-life terms:
- A Future is like a package delivery: sent once, received once (either successfully delivered or lost).
- A Stream is like a TV broadcast: continuously sending frames that you can watch until you turn off the TV (unsubscribe).
Code Comparison: Behavioral Differences
// Future: Returns a single result
Future<String> fetchSingleData() async {
await Future.delayed(Duration(seconds: 1));
return "Single data result";
}
// Stream: Returns multiple consecutive results
Stream<String> fetchStreamData() async* {
await Future.delayed(Duration(seconds: 1));
yield "First data"; // Emit first result
await Future.delayed(Duration(seconds: 1));
yield "Second data"; // Emit second result
await Future.delayed(Duration(seconds: 1));
yield "Third data"; // Emit third result
}
void main() async {
// Handling Future
print("Fetching Future data");
String futureResult = await fetchSingleData();
print("Future result: $futureResult");
// Handling Stream
print("\nFetching Stream data");
await for (String streamResult in fetchStreamData()) {
print("Stream result: $streamResult");
}
print("Stream finished");
}
// Output order:
// Fetching Future data
// Future result: Single data result
//
// Fetching Stream data
// (1-second wait)
// Stream result: First data
// (1-second wait)
// Stream result: Second data
// (1-second wait)
// Stream result: Third data
// Stream finished
Key differences:
- Future uses await to get one result and then completes.
- Stream uses an await for loop to continuously receive multiple results until the stream ends.
II. Creating Streams: From Simple to Complex Generation Methods
Dart provides several ways to create Streams for different scenarios.
1. From Iterables: Stream.fromIterable
If you have a collection of data and want to send it as a stream, use Stream.fromIterable:
void main() {
// Create Stream from List
Stream<String> fruitStream = Stream.fromIterable([
"Apple",
"Banana",
"Orange",
]);
// Listen to the stream
fruitStream.listen((fruit) {
print("Received fruit: $fruit");
});
}
// Output:
// Received fruit: Apple
// Received fruit: Banana
// Received fruit: Orange
This method is useful for converting static data to streams, enabling uniform processing (like handling with the same logic as dynamic streams).
2. With Async Generators: async* and yield
The most common way to create dynamic streams is using async generator functions:
- Declare with async* (indicates return type is Stream)
- Use yield to emit stream data (each yield adds one item to the stream)
- Use yield* to forward data from another stream (nested streams)
// Generate number stream from 1~5, emitting one number per second
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(seconds: 1));
yield i; // Emit current number
}
}
// Forward another stream with prefix added
Stream<String> prefixStream(Stream<int> numbers) async* {
// Use yield* to forward number stream and convert to strings
yield* numbers.map((number) => "Number: $number");
}
void main() {
// Create number stream
Stream<int> numbers = countStream(3);
// Create prefixed stream
Stream<String> prefixedNumbers = prefixStream(numbers);
// Listen to prefixed stream
prefixedNumbers.listen((data) {
print(data);
});
}
// Output (one line per second):
// Number: 1
// Number: 2
// Number: 3
3. Manual Control with StreamController
For more complex scenarios (like manually triggering stream data or converting external events to streams), use StreamController:
import 'dart:async';
void main() {
// Create stream controller
final controller = StreamController<String>();
// Get the controller's stream (for external listening)
final stream = controller.stream;
// Listen to the stream
stream.listen(
(data) {
print("Received data: $data");
},
onDone: () {
print("Stream closed");
},
);
// Manually add data to the stream
controller.add("First message");
controller.add("Second message");
// Add final message after 3 seconds and close the stream
Future.delayed(Duration(seconds: 3), () {
controller.add("Last message");
controller
.close(); // Close stream (must call, otherwise stream remains active)
});
}
// Output:
// Received data: First message
// Received data: Second message
// (3-second wait)
// Received data: Last message
// Stream closed
StreamController is a versatile tool for creating streams, especially useful for converting callback events (like button clicks or timers) to streams.
4. Other Common Creation Methods
- Stream.periodic: Emits data at regular intervals (like a timer)
// Emit current time every 2 seconds
Stream<DateTime> timeStream = Stream.periodic(
Duration(seconds: 2),
(count) => DateTime.now(),
);
- Stream.value: Creates a stream with just one value (similar to Future.value)
Stream<String> singleValueStream = Stream.value("Only one value");
III. Listening to Streams: Handling Data, Errors, and Completion
Listening to streams is the core operation for working with Streams. The listen method handles three types of events: data events, error events, and completion events.
1. Basic Listening
Stream<int> numberStreamWithError() async* {
yield 1;
yield 2;
// Emit error
throw Exception("Error occurred during processing");
yield 3; // Code after error won't execute
}
void main() {
numberStreamWithError().listen(
// 1. Handle data events (required)
(data) {
print("Received data: $data");
},
// 2. Handle error events (optional)
onError: (error) {
print("Caught error: ${error.toString()}");
},
// 3. Handle completion event (optional)
onDone: () {
print("Stream completed");
},
);
}
// Output:
// Received data: 1
// Received data: 2
// Caught error: Exception: Error occurred during processing
// Stream completed
Note: Once a stream emits an error, subsequent yield statements are ignored, and onDone triggers immediately.
2. Listening with await for Loop
await for is another way to listen to streams with syntax similar to synchronous loops. It's ideal for streams without errors (or with errors handled externally by try-catch):
Stream<String> messageStream() async* {
yield "Hello";
await Future.delayed(Duration(seconds: 1));
yield "World";
}
void main() async {
try {
// await for loop continuously receives stream data until stream ends
await for (String message in messageStream()) {
print("Message: $message");
}
print("Stream processing completed");
} catch (e) {
print("Caught error: $e");
}
}
// Output:
// Message: Hello
// (1-second wait)
// Message: World
// Stream processing completed
3. Two Stream Types: Single-Subscription vs. Broadcast
Dart has two stream types that affect listening behavior significantly:
- Single-subscription streams:
- Can be listened to by only one subscriber once
- Data is one-time and can't be retrieved if missed (like network data streams)
- Most streams are single-subscription by default
- Broadcast streams:
- Can be listened to by multiple subscribers simultaneously
- New subscribers only receive new data after subscription (not historical data)
- Ideal for event notifications (like button clicks, state changes)
Converting a single-subscription stream to a broadcast stream:
// Generate number stream from 1~5, emitting one number per second
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(seconds: 1));
yield i; // Emit current number
}
}
void main() {
// Create single-subscription stream
Stream<int> singleStream = countStream(3);
// Convert to broadcast stream
Stream<int> broadcastStream = singleStream.asBroadcastStream();
// First subscriber
broadcastStream.listen((data) => print("Subscriber 1: $data"));
// Add second subscriber after 1.5 second
Future.delayed(Duration(milliseconds: 1500), () {
broadcastStream.listen((data) => print("Subscriber 2: $data"));
});
}
// Output (one line per second):
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 2: 2 (Second subscriber starts receiving from subscription point)
// Subscriber 1: 3
// Subscriber 2: 3
IV. Unsubscribing: Avoiding Memory Leaks
Streams must be unsubscribed when no longer needed to prevent memory leaks (stream controllers and subscribers continuing to consume resources).
1. Unsubscribing with StreamSubscription
The listen method returns a StreamSubscription object. Call its cancel() method to unsubscribe:
import 'dart:async';
void main() {
// Create stream that emits data continuously (every 500ms)
Stream<int> continuousStream = Stream.periodic(
Duration(milliseconds: 500),
(count) => count,
);
// Subscribe to stream and get subscription object
StreamSubscription<int> subscription = continuousStream.listen((data) {
print("Received data: $data");
});
// Cancel subscription after 3 seconds
Future.delayed(Duration(seconds: 3), () {
print("Canceling subscription");
subscription.cancel(); // Unsubscribe, stream stops emitting data
});
}
// Output:
// Received data: 0
// Received data: 1
// Received data: 2
// Received data: 3
// Received data: 4
// Received data: 5
// Canceling subscription
2. Best Practices for Unsubscribing
- In StatefulWidget (Flutter): Cancel in dispose method
@override
void dispose() {
_subscription?.cancel(); // Cancel when widget is destroyed
super.dispose();
}
- Limit received items with take: Automatically unsubscribes
// Receive only first 3 items, then automatically unsubscribe
continuousStream.take(3).listen((data) {
print("Received data: $data");
});
V. Stream Transformations: Processing and Converting Stream Data
Streams' power lies in their ability to process data through transformations (like filtering, mapping, merging), similar to collection operation chains.
1. Common Transformation Methods
- map: Convert data type
Stream<int> numbers = countStream(3);
Stream<String> stringNumbers = numbers.map((n) => "Number $n");
- where: Filter data
Stream<int> numbers = countStream(5);
Stream<int> evenNumbers = numbers.where(
(n) => n % 2 == 0,
); // Keep only even numbers
- take/skip: Take first N items/skip first N items
Stream<int> numbers = countStream(5);
numbers.take(3).listen(print); // Outputs 1,2,3
numbers.skip(2).listen(print); // Outputs 3,4,5
- expand: Expand single item into multiple items
Stream<int> numbers = countStream(2);
// Expand each number into [n, n*10]
Stream<int> expanded = numbers.expand((n) => [n, n * 10]);
expanded.listen(print); // Outputs 1,10,2,20
2. Chained Transformation Example
void main() {
// Generate number stream from 1~10
Stream<int> numbers = Stream.fromIterable(List.generate(10, (i) => i + 1));
// Chained transformations: filter even numbers → multiply by 10 → convert to string
numbers
.where((n) => n % 2 == 0) // Keep even numbers: 2,4,6,8,10
.map((n) => n * 10) // Multiply by 10: 20,40,60,80,100
.map((n) => "Result: $n") // Convert to string
.listen((data) => print(data));
}
// Output:
// Result: 20
// Result: 40
// Result: 60
// Result: 80
// Result: 100
VI. Practical Application Scenarios
Streams are widely used in real-world development, especially in these scenarios:
- Real-time data updates:
import 'dart:math';
// Simulate real-time stock price updates
Stream<double> stockPriceStream(String symbol) async* {
double price = 100.0;
while (true) {
await Future.delayed(Duration(seconds: 1));
// Random price fluctuation
price += (Random().nextDouble() - 0.5) * 2;
yield price;
}
}
2 . File download progress:
// Simulate file download progress (0% ~ 100%)
Stream<int> downloadProgress() async* {
for (int progress = 0; progress <= 100; progress += 5) {
await Future.delayed(Duration(milliseconds: 300));
yield progress;
}
}
3 . Event listening:
// Convert button click events to stream (Flutter example)
StreamController<void> _buttonClicks = StreamController<void>();
// Called when button is tapped
void _onButtonTap() {
_buttonClicks.add(null); // Emit click event
}
// Listen to click events
_buttonClicks.stream.listen((_) {
print("Button clicked");
});
Top comments (0)