DEV Community

Cover image for Procesos Asíncronos. Streams en dart.
Pablo L
Pablo L

Posted on • Updated on

Procesos Asíncronos. Streams en dart.

Introducción

Comprender la forma en como dart trata los procesos asíncronos y entender los streams puede costar al principio porque se alejan un poquito de la programación tradicional, pero no es algo difícil y una vez comprendida la foto te darás cuenta de su utilidad.

Voy a tratar en este artículo de ser lo más claro y sencillo posible. Sin presuponer conocimientos previos de la materia en dart.

Streams

En la mayoría de los tutoriales de dart a la hora de definir un Stream suelen comparar un Stream con una cinta transportadora en donde dejamos datos al principio de la cinta para recogerlos al final de la misma más tarde.

Efectivamente en nuestro programa un proceso de ejecución como por ejemplo puede ser una entrada de texto de usuario o la respuesta a un request HTTP, deja datos en un Stream. Otro proceso de ejecución recoge esos datos de este Stream y los procesa.

Cuando digo que otro proceso de ejecución recoge esos datos te puedes preguntar de que proceso estoy hablando. Ese proceso o procesos los habrás creado previamente, se llaman listeners y son configurados para "escuchar" lo que pasa en ese stream (aviso de inicio datos, fin de datos, errores) y por supuesto para recoger los datos que otro proceso dejó.

Dart es un lenguaje de programación reactiva y en nuestro programas reaccionamos antes los datos. En otros lenguajes como por ejemplo Java, en las operaciones de E/S (que siempre son asíncronas) se bloquea el hilo de ejecución para "facilitar" las cosas. Por ejemplo en Java o C, cuando leemos de un fichero le decimos a nuestro programa algo así como:

Lee este fichero pero cuando tengas 1024 bytes te paras y proceso los datos ahora

En dart con programación reactiva y los streams suena distinto...le decimos algo como:

Lee este fichero y me vas soltando los datos en este Stream, algún listener procesará los datos

No sé si la exposición es muy académica pero quiero que sea entendible :) En dart hay que configurar como reaccionar al cambio en los datos. No quiero entrar en si la programación reactiva de dart es mejor o no que la programación tradicional.

Entendiendo los Streams desde el código

Vamos a bajar un poquito al código dart. Vamos a ver clases y métodos relacionados para entender el concepto.

  1. StreamController es la clase principal implicada en los streams. Creamos streams desde esta clase. El constructor usa el tipo de dato que utilizaremos en nuestro stream.

  2. stream es un getter de la clase StreamController para obtener su Stream

  3. listen(dynamic data) método de la clase Stream para escuchar lo que pasa en el mismo. En principio solo puede haber un escuchador por Stream aunque veremos que fácilmente podemos configurarlo para que existan varios.

El tipo dynamic puede ser cualquier tipo pero coincidirá con el tipo T indicado en el constructor parametrizado de StreamController mencionado en el punto 1.

  1. sink getter de la clase StreamController para obtener la instancia del Sink del stream. El Sink es donde poner los datos

  2. add() método de la clase Sink para añadir un dato al stream.

Ejemplo mínimo

Vamos a poner el ejemplo más ridículo y breve posible que he podido desarrollar para que no te pierdas los conceptos clave.

import 'dart:async';

void main() {
  StreamController controller=new StreamController<int>();

  controller.stream.listen((dynamic data){
    print(data);    
  });

  controller.sink.add(2);
}
Enter fullscreen mode Exit fullscreen mode

El ejemplo representa bastante bien los conceptos mencionados. Crear StreamController, crear listener, añadir dato, captura de datos. Lo vemos.

Hemos creado una instancia de la clase StreamController:

StreamController controller=new StreamController<int>();
Enter fullscreen mode Exit fullscreen mode

Hemos creado un listener que capturan los datos añadididos al stream :

StreamController controller=new StreamController<int>();
Enter fullscreen mode Exit fullscreen mode

Hemos añadido una dato de tipo int al Stream:

controller.sink.add(2);
Enter fullscreen mode Exit fullscreen mode

Capturar errores, fin de datos

Además de recoger los datos para su proceso deberíamos capturar los posibles errores de nuestro Stream. Imaginemos que queremos hacer un request a una página de internet. Es posible que haya un error de red durante la transmisión o quizás la página este caída. Deberíamos poder informar de estas situaciones y cerrar la conexión.

Mediante el método handleError(Object error) de la clase Stream capturamos el error. Para añadirlo utilizamos el método handleError(Object error).

También puede ser útil controlar cuando ya se ha cerrado el stream y no va a haber más datos. En este caso usamos el método onDone(). El fin de datos se produce cuando el stream lo cerramos con el método **close()*

Lo vemos con un ejemplo. Vamos a añadir al código anterior la captura de errores, control de fin de datos.

import 'dart:async';

void main() {
  StreamController controller = new StreamController<int>();


  controller.stream.listen(
          (data) {
        print('Data: $data');
      },
      onError: (err) {
        print('Error!');
        controller.sink.close();
      },
      onDone: () {
        print('Done!');
      }
  );

  controller.sink.add(2);
  controller.sink.addError("error");
}
Enter fullscreen mode Exit fullscreen mode

El resultado de la ejecución es:

2
error
Done!

Más de un listener

Si en el código anterior intentamos añadir un listener:

import 'dart:async';

void main() {
  StreamController controller = new StreamController<int>();

  controller.stream.listen((data) {
    print('Data: $data');
  });

  controller.stream.listen((data) {
    print('Data: $data');
  });

  controller.sink.add(2);
  controller.sink.addError("error");
  controller.sink.close();
}
Enter fullscreen mode Exit fullscreen mode

Veremos que obtenemos un error similar a este:

Unhandled Exception: Bad state: Stream has already been listened to

Efectivamente no podemos tener más de un listener en nuestro stream, pero mediante el constructor broadcast() del StreamController si que podemos.

import 'dart:async';

void main() {
  StreamController controller = new StreamController<int>.broadcast();

  controller.stream.listen((data) {
    print('Data: $data');
  });

  controller.stream.listen((data) {
    print('Data: $data');
  });

  controller.sink.add(2);
}
Enter fullscreen mode Exit fullscreen mode

Otra forma de crear streams, async* y yield

Imaginemos que tenemos una serie de valores que queremos añadir a un stream de una forma sistemática como por ejemplo números aleatorios o una serie de números primos. En estos casos en lugar de ir añadiendo al stream número a número mediante el método add(), hay otra forma muy elegante. Vamos al ejemplo para y luego te explico conceptos.

import 'dart:async';

bool isPrimo(n) {
  for (int i = 1; i < n; i++) {
    if (n % i == 0 && i!=1) {
      return false;
    }
  }

  return true;
}

Stream<int> generaPrimos(n) async* {
  for (int i = 1; i <= n; i++) {
    if (isPrimo(i)) {
      yield i;
    }
  }
}


void main() {
  generaPrimos(100).listen((data){
    print (data);
  });
}
Enter fullscreen mode Exit fullscreen mode

Vamos a centrarnos en esta función generadora de valores:

Stream<int> generaPrimos(n) async* {
  for (int i = 1; i <= n; i++) {
    if (isPrimo(i)) {
      yield i;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Esta función generadora de números primos termina con un operador clave async*. Cuando terminamos una función con async* sucede lo siguiente:

  1. Al igual que async, convierte la función es asíncrona.
  2. Nos obliga a retornar un tipo Stream.
  3. Nos da la oportunidad de utilizar un comando muy útil y elegante para usar en listas de datos. Se llama yield.

El comando yield retorna un valor pero a diferencia del comando return normal, no sale de la función sino que continua con el flujo de esta. En el ejemplo, la función asíncrona generaPrimo, cuando encuentra un número primo retorna ese número pero continúa en la función a la búsqueda del siguiente.

Conclusión

Tengo mucho experiencia en programación tradicional principalmente en Java y poca en programación reactiva así que cuando conocí dart y su forma de tratar la asincronía, streams, futures etc...pensé...pero como no se inventó esto antes? :)

Espero que te haya gustado el artículo!!! Como admirador del lenguaje dart A mi me encantó escribirlo!!! Viva dart!!! :)

En breve redactaré otro sobre futures en dart que está muy relacionado.

Muchas gracias!!
(PLM)

Top comments (0)