DEV Community

Guilherme Silva
Guilherme Silva

Posted on

Dart's future.asStream() quick overview

Dart makes it easy to transform a future into a stream, but what will happen after that? For example, if we cancel the stream, what will happen in the future? And what about if the future throws an exception? Can a try-catch block help?

TLDR

stream/dart

Into

Problem: In dart, you can't cancel (or stop) a future

Possible solution: you can cancel a stream's subscription. So, the idea is to transform a future into a stream, and cancel it when needed.

We need to understand what will happen:

  • in case it completes normally
    • before that, the stream is canceled
    • but the stream was already canceled
  • in case it throws
    • before the stream is canceled,
      • onError is defined
        • and cancelOnError is false (default)
        • and cancelOnError is true (default)
      • onError is NOT defined
    • after the stream is canceled

Situation 1 - the future completes normally, before that the stream is canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print(' stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  // throw Exception('BAL');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
future has FINISHED
  stream.onData(): future finished with value: true
  stream.onDone(): done
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone and onData callbacks are called
  • onData is called with the future's result

Situation 2 - the future completes normally, but the stream was already canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    await sub.cancel();
    print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  // throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
   stream.cancel() was ALREADY cancelled!
future is STILL running
future has FINISHED
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • the future continues running, even after we canceled the stream
  • both onDone and onData callbacks are NOT called (the result is completely ignored)

Situation 3 - throws before the stream is canceled, onError is defined and cancelOnError is false (default)

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));

  throw Exception('HEHE');

  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onError(): Exception: HEHE
  stream.onDone(): done
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone and onError callbacks are called

Situation 4 - throws before the stream is canceled, onError is defined, cancelOnError is true

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onError(): Exception: HEHE
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • onError callbacks is called
  • onDone is NOT called
  • try-catch is NOT called

Situation 5 - throws before the stream is canceled, onError is NOT defined

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      // onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onDone(): done
Unhandled exception:
Exception: BAL
#0      longAPICall
bin/future_playground.dart:29
<asynchronous suspension>
#1      new Stream.fromFuture.<anonymous closure> (dart:async/stream.dart:247:17)
<asynchronous suspension>

Exited (255)
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone is still called (because cancelOnError's default is false)
  • try-catch is ignored (aahá! 👀)
  • exception is caught by Zone's error handling

Situation 6 - the future throws after the stream is canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      // onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    await sub.cancel();
    print('   stream was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
   stream was ALREADY cancelled!
future is STILL running
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • the future continues running even after we canceled the stream
  • try-catch is ignored
  • onError was not declared, and it didn't make a difference (the error is simply ignored, and wasn't sent to zone 's handler) (aahá(2)! 👀)

Top comments (0)