DEV Community

Ge Ji
Ge Ji

Posted on

Dart Lesson 15: Stream — Processing continuous data streams

In the previous two lessons, we learned about Future and core asynchronous programming techniques, which are mainly used for handling single asynchronous results (like a single network request or a single file read/write). But in real-world development, we often need to handle continuous streams of asynchronous events – such as real-time chat messages, sensor data, or file download progress. Today we'll learn about Stream, Dart's core tool for working with these continuous data flows.

I. Fundamental Differences Between Stream and Future: Single Value vs Multiple Values

Before diving into Stream, let's clarify its core differences from Future:

Characteristic Future Stream
Number of results Single result (or error) Multiple consecutive results (or errors)
Lifecycle Pending → Completed (success/failure) Pending → Emitting data → Completed/Errored
Typical scenarios Single network request, single file operation Real-time data, event listening, progress updates

In real-life terms:

  • A Future is like a package delivery: sent once, received once (either successfully delivered or lost).
  • A Stream is like a TV broadcast: continuously sending frames that you can watch until you turn off the TV (unsubscribe).

Code Comparison: Behavioral Differences

// Future: Returns a single result
Future<String> fetchSingleData() async {
  await Future.delayed(Duration(seconds: 1));
  return "Single data result";
}

// Stream: Returns multiple consecutive results
Stream<String> fetchStreamData() async* {
  await Future.delayed(Duration(seconds: 1));
  yield "First data"; // Emit first result
  await Future.delayed(Duration(seconds: 1));
  yield "Second data"; // Emit second result
  await Future.delayed(Duration(seconds: 1));
  yield "Third data"; // Emit third result
}

void main() async {
  // Handling Future
  print("Fetching Future data");
  String futureResult = await fetchSingleData();
  print("Future result: $futureResult");

  // Handling Stream
  print("\nFetching Stream data");
  await for (String streamResult in fetchStreamData()) {
    print("Stream result: $streamResult");
  }
  print("Stream finished");
}

// Output order:
// Fetching Future data
// Future result: Single data result
//
// Fetching Stream data
// (1-second wait)
// Stream result: First data
// (1-second wait)
// Stream result: Second data
// (1-second wait)
// Stream result: Third data
// Stream finished
Enter fullscreen mode Exit fullscreen mode

Key differences:

  • Future uses await to get one result and then completes.
  • Stream uses an await for loop to continuously receive multiple results until the stream ends.

II. Creating Streams: From Simple to Complex Generation Methods

Dart provides several ways to create Streams for different scenarios.

1. From Iterables: Stream.fromIterable

If you have a collection of data and want to send it as a stream, use Stream.fromIterable:

void main() {
  // Create Stream from List
  Stream<String> fruitStream = Stream.fromIterable([
    "Apple",
    "Banana",
    "Orange",
  ]);

  // Listen to the stream
  fruitStream.listen((fruit) {
    print("Received fruit: $fruit");
  });
}

// Output:
// Received fruit: Apple
// Received fruit: Banana
// Received fruit: Orange
Enter fullscreen mode Exit fullscreen mode

This method is useful for converting static data to streams, enabling uniform processing (like handling with the same logic as dynamic streams).

2. With Async Generators: async* and yield

The most common way to create dynamic streams is using async generator functions:

  • Declare with async* (indicates return type is Stream)
  • Use yield to emit stream data (each yield adds one item to the stream)
  • Use yield* to forward data from another stream (nested streams)
// Generate number stream from 1~5, emitting one number per second
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // Emit current number
  }
}

// Forward another stream with prefix added
Stream<String> prefixStream(Stream<int> numbers) async* {
  // Use yield* to forward number stream and convert to strings
  yield* numbers.map((number) => "Number: $number");
}

void main() {
  // Create number stream
  Stream<int> numbers = countStream(3);

  // Create prefixed stream
  Stream<String> prefixedNumbers = prefixStream(numbers);

  // Listen to prefixed stream
  prefixedNumbers.listen((data) {
    print(data);
  });
}

// Output (one line per second):
// Number: 1
// Number: 2
// Number: 3
Enter fullscreen mode Exit fullscreen mode

3. Manual Control with StreamController

For more complex scenarios (like manually triggering stream data or converting external events to streams), use StreamController:

import 'dart:async';

void main() {
  // Create stream controller
  final controller = StreamController<String>();

  // Get the controller's stream (for external listening)
  final stream = controller.stream;

  // Listen to the stream
  stream.listen(
    (data) {
      print("Received data: $data");
    },
    onDone: () {
      print("Stream closed");
    },
  );

  // Manually add data to the stream
  controller.add("First message");
  controller.add("Second message");

  // Add final message after 3 seconds and close the stream
  Future.delayed(Duration(seconds: 3), () {
    controller.add("Last message");
    controller
        .close(); // Close stream (must call, otherwise stream remains active)
  });
}

// Output:
// Received data: First message
// Received data: Second message
// (3-second wait)
// Received data: Last message
// Stream closed
Enter fullscreen mode Exit fullscreen mode

StreamController is a versatile tool for creating streams, especially useful for converting callback events (like button clicks or timers) to streams.

4. Other Common Creation Methods

  • Stream.periodic: Emits data at regular intervals (like a timer)
// Emit current time every 2 seconds
Stream<DateTime> timeStream = Stream.periodic(
  Duration(seconds: 2),
  (count) => DateTime.now(),
);
Enter fullscreen mode Exit fullscreen mode
  • Stream.value: Creates a stream with just one value (similar to Future.value)
Stream<String> singleValueStream = Stream.value("Only one value");
Enter fullscreen mode Exit fullscreen mode

III. Listening to Streams: Handling Data, Errors, and Completion

Listening to streams is the core operation for working with Streams. The listen method handles three types of events: data events, error events, and completion events.

1. Basic Listening

Stream<int> numberStreamWithError() async* {
  yield 1;
  yield 2;
  // Emit error
  throw Exception("Error occurred during processing");
  yield 3; // Code after error won't execute
}

void main() {
  numberStreamWithError().listen(
    // 1. Handle data events (required)
    (data) {
      print("Received data: $data");
    },
    // 2. Handle error events (optional)
    onError: (error) {
      print("Caught error: ${error.toString()}");
    },
    // 3. Handle completion event (optional)
    onDone: () {
      print("Stream completed");
    },
  );
}

// Output:
// Received data: 1
// Received data: 2
// Caught error: Exception: Error occurred during processing
// Stream completed
Enter fullscreen mode Exit fullscreen mode

Note: Once a stream emits an error, subsequent yield statements are ignored, and onDone triggers immediately.

2. Listening with await for Loop

await for is another way to listen to streams with syntax similar to synchronous loops. It's ideal for streams without errors (or with errors handled externally by try-catch):

Stream<String> messageStream() async* {
  yield "Hello";
  await Future.delayed(Duration(seconds: 1));
  yield "World";
}

void main() async {
  try {
    // await for loop continuously receives stream data until stream ends
    await for (String message in messageStream()) {
      print("Message: $message");
    }
    print("Stream processing completed");
  } catch (e) {
    print("Caught error: $e");
  }
}

// Output:
// Message: Hello
// (1-second wait)
// Message: World
// Stream processing completed
Enter fullscreen mode Exit fullscreen mode

3. Two Stream Types: Single-Subscription vs. Broadcast

Dart has two stream types that affect listening behavior significantly:

  • Single-subscription streams:
    • Can be listened to by only one subscriber once
    • Data is one-time and can't be retrieved if missed (like network data streams)
    • Most streams are single-subscription by default
  • Broadcast streams:
    • Can be listened to by multiple subscribers simultaneously
    • New subscribers only receive new data after subscription (not historical data)
    • Ideal for event notifications (like button clicks, state changes)

Converting a single-subscription stream to a broadcast stream:

// Generate number stream from 1~5, emitting one number per second
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i; // Emit current number
  }
}

void main() {
  // Create single-subscription stream
  Stream<int> singleStream = countStream(3);

  // Convert to broadcast stream
  Stream<int> broadcastStream = singleStream.asBroadcastStream();

  // First subscriber
  broadcastStream.listen((data) => print("Subscriber 1: $data"));

  // Add second subscriber after 1.5 second
  Future.delayed(Duration(milliseconds: 1500), () {
    broadcastStream.listen((data) => print("Subscriber 2: $data"));
  });
}

// Output (one line per second):
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 2: 2  (Second subscriber starts receiving from subscription point)
// Subscriber 1: 3
// Subscriber 2: 3
Enter fullscreen mode Exit fullscreen mode

IV. Unsubscribing: Avoiding Memory Leaks

Streams must be unsubscribed when no longer needed to prevent memory leaks (stream controllers and subscribers continuing to consume resources).

1. Unsubscribing with StreamSubscription

The listen method returns a StreamSubscription object. Call its cancel() method to unsubscribe:

import 'dart:async';

void main() {
  // Create stream that emits data continuously (every 500ms)
  Stream<int> continuousStream = Stream.periodic(
    Duration(milliseconds: 500),
    (count) => count,
  );

  // Subscribe to stream and get subscription object
  StreamSubscription<int> subscription = continuousStream.listen((data) {
    print("Received data: $data");
  });

  // Cancel subscription after 3 seconds
  Future.delayed(Duration(seconds: 3), () {
    print("Canceling subscription");
    subscription.cancel(); // Unsubscribe, stream stops emitting data
  });
}

// Output:
// Received data: 0
// Received data: 1
// Received data: 2
// Received data: 3
// Received data: 4
// Received data: 5
// Canceling subscription
Enter fullscreen mode Exit fullscreen mode

2. Best Practices for Unsubscribing

  • In StatefulWidget (Flutter): Cancel in dispose method
@override
void dispose() {
  _subscription?.cancel(); // Cancel when widget is destroyed
  super.dispose();
}
Enter fullscreen mode Exit fullscreen mode
  • Limit received items with take: Automatically unsubscribes
// Receive only first 3 items, then automatically unsubscribe
continuousStream.take(3).listen((data) {
  print("Received data: $data");
});
Enter fullscreen mode Exit fullscreen mode

V. Stream Transformations: Processing and Converting Stream Data

Streams' power lies in their ability to process data through transformations (like filtering, mapping, merging), similar to collection operation chains.

1. Common Transformation Methods

  • map: Convert data type
Stream<int> numbers = countStream(3);
Stream<String> stringNumbers = numbers.map((n) => "Number $n");
Enter fullscreen mode Exit fullscreen mode
  • where: Filter data
Stream<int> numbers = countStream(5);
Stream<int> evenNumbers = numbers.where(
  (n) => n % 2 == 0,
); // Keep only even numbers
Enter fullscreen mode Exit fullscreen mode
  • take/skip: Take first N items/skip first N items
Stream<int> numbers = countStream(5);
numbers.take(3).listen(print); // Outputs 1,2,3
numbers.skip(2).listen(print); // Outputs 3,4,5
Enter fullscreen mode Exit fullscreen mode
  • expand: Expand single item into multiple items
Stream<int> numbers = countStream(2);
// Expand each number into [n, n*10]
Stream<int> expanded = numbers.expand((n) => [n, n * 10]);
expanded.listen(print); // Outputs 1,10,2,20
Enter fullscreen mode Exit fullscreen mode

2. Chained Transformation Example

void main() {
  // Generate number stream from 1~10
  Stream<int> numbers = Stream.fromIterable(List.generate(10, (i) => i + 1));

  // Chained transformations: filter even numbers → multiply by 10 → convert to string
  numbers
      .where((n) => n % 2 == 0) // Keep even numbers: 2,4,6,8,10
      .map((n) => n * 10) // Multiply by 10: 20,40,60,80,100
      .map((n) => "Result: $n") // Convert to string
      .listen((data) => print(data));
}

// Output:
// Result: 20
// Result: 40
// Result: 60
// Result: 80
// Result: 100
Enter fullscreen mode Exit fullscreen mode

VI. Practical Application Scenarios

Streams are widely used in real-world development, especially in these scenarios:

  1. Real-time data updates:
import 'dart:math';

// Simulate real-time stock price updates
Stream<double> stockPriceStream(String symbol) async* {
  double price = 100.0;
  while (true) {
    await Future.delayed(Duration(seconds: 1));
    // Random price fluctuation
    price += (Random().nextDouble() - 0.5) * 2;
    yield price;
  }
}
Enter fullscreen mode Exit fullscreen mode

2 . File download progress:

// Simulate file download progress (0% ~ 100%)
Stream<int> downloadProgress() async* {
  for (int progress = 0; progress <= 100; progress += 5) {
    await Future.delayed(Duration(milliseconds: 300));
    yield progress;
  }
}
Enter fullscreen mode Exit fullscreen mode

3 . Event listening:

// Convert button click events to stream (Flutter example)
StreamController<void> _buttonClicks = StreamController<void>();

// Called when button is tapped
void _onButtonTap() {
  _buttonClicks.add(null); // Emit click event
}

// Listen to click events
_buttonClicks.stream.listen((_) {
  print("Button clicked");
});
Enter fullscreen mode Exit fullscreen mode

Top comments (0)