DEV Community

David Goyes
David Goyes

Posted on

Combine #14: Practicando share()

El ejercicio consiste en descargar una entrada tipo Story del endpoint: https://hacker-news.firebaseio.com/v0/item/<ID>.json. Para esto se usó el siguiente bloque de código.

Es importante destacar el uso de .receive(on: apiQueue) antes de .decode(...) para enviar la respuesta de la petición a otra cola antes de empezar la codificación, y luego .receive(on: DispatchQueue.main) para mandar al hilo principal antes de cerrar el pipeline. Por otro lado, debido a que .decode(...) puede arrojar un error de tipo indeterminado, se pone el operador .catch { _ in Fail(error: .invalidResponse) } para capturar ese error indeterminado y devolver un Publisher que termina inmediatamente con el error especificado por parámetro.

Nota: Se usa attributes: .concurrent para definir apiQueue porque más adelante se va a necesitar hacer varias consultas de forma concurrente, así que conviene decodificar también de forma concurrente.

private let apiQueue = DispatchQueue(label: "API", qos: .default, attributes: .concurrent)
func story(id: Int) -> any Publisher<Story, Error> {
  URLSession.shared
    .dataTaskPublisher(for: EndPoint.story(id).url)
    .receive(on: apiQueue)
    .map(\.data)
    .decode(type: Story.self, decoder: decoder)
    .catch { _ in Fail(error: .invalidResponse) }
    .receive(on: DispatchQueue.main)
}
Enter fullscreen mode Exit fullscreen mode

Haciendo varias peticiones, integrando publishers

Con base en el ejercicio anterior, se pidió agregar una funcionalidad que permitiera descargar varias entradas, a partir de un arreglo de identificadores, respetando el límite máximo de descargas. Se puede alcanzar el resultado por dos caminos: usando flatMap y usando una especie de merge iterativo.

Con flatMap se crea una colección de Publishers y se retorna una sola suscripción, que emite los valores retornados por los Publishers. Para administrar mejor los recursos, se puede poner un límite con maxPublishers.

func mergedStories(ids storyIDs: [Int]) -> any Publisher<Story, Error> {
  storyIDs
    .publisher
    .flatMap(maxPublishers: .max(maxStories)) { storyID in
      story(id: storyID)
    }
    .eraseToAnyPublisher()
}
Enter fullscreen mode Exit fullscreen mode

Aunque la implementación de story(id:) funcionaba perfectamente de forma aislada, flatMap necesitaba retornar tipos concretos y no any Publisher. Por esta razón, tuve que modificar la firma de story(id:) para devolver AnyPublisher y también usar .eraseToAnyPublisher() al final.

func story(id: Int) -> AnyPublisher<Story, Error> {
  URLSession.shared
    ...
    .receive(on: DispatchQueue.main)
    .eraseToAnyPublisher()
}
Enter fullscreen mode Exit fullscreen mode

La aproximación que usa merge() primero extrae las primeras maxStories del arreglo recibido. Luego, se usa reduce sobre ese arreglo para devolver un solo Publisher. La semilla es un Empty que termina inmediatamente - lo que garantiza el fin del Publisher en caso de que el arreglo esté vacío. Luego, en el closure del reduce, se recibe el Publisher combinado y el id de la siguiente historia, con lo que se devuelve un Publisher con combined.merge(with: ... ). Es de señalar que Empty también debe convertirse en AnyPublisher porque, de no hacerlo, el reduce tendría que devolver un Empty. Asímismo, merge también debe convertirse en AnyPublisher porque, de no hacerlo, devolvería un Publishers.MergeMany.

func manuallyMergedStories(ids storyIDs: [Int]) -> any Publisher<Story, Error> {
  let storyIDs = Array(storyIDs.prefix(maxStories))

  return storyIDs.reduce(
    Empty<Story, Error>(completeImmediately: true)
      .eraseToAnyPublisher(),
    { combined, id in
      combined
        .merge(with: story(id: id))
        .eraseToAnyPublisher()
    })
}
Enter fullscreen mode Exit fullscreen mode

Esta implementación de merge con reduce es muy útil porque la versión nativa solo permite combinar hasta ocho Publishers.

Obteniendo la última de las historias

El endpoint: https://hacker-news.firebaseio.com/v0/newstories.json trae las últimas historias. El siguiente bloque de código sirve para descargar todas las historias de forma iterativa:

func stories() -> AnyPublisher<[Story], Error> {
  return URLSession.shared
    .dataTaskPublisher(for: EndPoint.stories.url) // 1
    .map(\.data) // 2
    .decode(type: [Int].self, decoder: decoder) // 3
    .mapError { error -> API.Error in // 4
      switch error {
      case is URLError:
        return Error.addressUnreachable(EndPoint.stories.url)
      default:
        return Error.invalidResponse(EndPoint.stories.url)
      }
    }
    .flatMap({ // 5
      self.mergedStories(ids: $0)
    })
    .scan([Story](), { result, story in // 6
      result + [story]
    })
    .map { $0.sorted() } // 7
    .receive(on: DispatchQueue.main) // 8
    .eraseToAnyPublisher()
}
Enter fullscreen mode Exit fullscreen mode

En el código anterior se tiene lo siguiente:

  1. Se crea el dataTaskPublisher para descargar la lista de historias recientes.
  2. Se extrae solo el atributo data de la respuesta de tipo Data. Hasta aquí tengo un Publisher<Data, URLError>.
  3. Se decodifica el Data en [Int]. Hasta aquí hay un Publisher<[Int], any Error>.
  4. Mapeo los errores con mapError para no tener any Error sino API.Error. Hasta acá tengo Publisher<[Int], API.Error>.
  5. Tomo el arreglo de ids y se lo paso al método mergedStories(ids:) que, dado un [Int], me retorna Publisher<Story, Error>. Si usara map, entonces Publisher<[Int], API.Error> se convertiría en Publisher<Publisher<Story, Error>, API.Error>, así que necesito usar flatMap para obtener Publisher<Story, Error>.
  6. Quiero actualizar un List cada vez que obtenga una nueva historia, así que voy a emitir un arreglo con todas las historias recibidas cada vez que reciba una historia usando span(). Hasta acá tengo Publisher<[Story], API.Error>.
  7. Quiero entregar el arreglo de historias ordenado, así que uso .map { $0.sorted() }.
  8. Como voy a pintar el resultado en la interfaz gráfica, conviene volver al hilo principal con .receive(on: DispatchQueue.main).

Cuestionario

1. ¿Para qué se usa .receive(on: apiQueue) antes de .decode(...) en el pipeline? ✅

  • [ ] Para forzar que la decodificación ocurra en el hilo principal.
  • [ ] Para ejecutar la decodificación en una cola concurrente y no bloquear el hilo principal. 
  • [ ] Para asegurar que la respuesta se reciba en orden.
  • [ ] Para convertir el tipo de error a API.Error.

2. ¿Por qué se usa .catch { _ in Fail(error: .invalidResponse) } en el método story(id:)? ✅

  • [ ] Para reintentar la petición en caso de error.
  • [ ] Para ignorar el error y devolver un valor vacío.
  • [ ] Para capturar un error de tipo indeterminado y emitir un Publisher que falla inmediatamente con un error controlado. 
  • [ ] Para cancelar todas las suscripciones activas.

3. ¿Cuál es la razón para usar attributes: .concurrent al crear apiQueue? ✅

  • [ ] Permitir que se decodifiquen varias respuestas simultáneamente. 
  • [ ] Evitar bloqueos causados por DispatchQueue.main.
  • [ ] Garantizar la ejecución secuencial de las tareas de red.
  • [ ] Hacer que URLSession sea más rápido al usar hilos múltiples.

4. ¿Qué ventaja ofrece flatMap(maxPublishers:) al descargar varias historias? ✅

  • [ ] Combina los valores en una sola lista ordenada automáticamente.
  • [ ] Permite limitar cuántos Publishers se ejecutan en paralelo. 
  • [ ] Convierte los errores en any Error.  D) Detiene el flujo si una descarga falla.

5. ¿Por qué fue necesario cambiar la firma de story(id:) para devolver AnyPublisher? ✅

  • [ ] Porque flatMap requiere Publishers con tipos concretos, no existenciales como any Publisher
  • [ ] Porque eraseToAnyPublisher() no se puede aplicar a Publishers genéricos.
  • [ ] Porque story(id:) ya no devolvía valores.
  • [ ] Para poder usar merge(with:) más adelante.

6. ¿Cuál es la función del Empty<Story, Error> usado como semilla en la implementación con reduce? ✅

  • [ ] Emitir un valor inicial vacío.
  • [ ] Garantizar que el Publisher termine correctamente incluso si el arreglo está vacío. 
  • [ ] Convertir los valores en Optional.
  • [ ] Repetir la última historia emitida.

7. ¿Por qué es necesario usar .eraseToAnyPublisher() al combinar Publishers con merge(with:) dentro del reduce? ✅

  • [ ] Porque merge devuelve un Publishers.MergeMany, y se quiere un tipo uniforme (AnyPublisher). 
  • [ ] Para evitar la ejecución concurrente.
  • [ ] Porque merge no puede emitir errores.
  • [ ] Para cambiar el tipo de error del Publisher.

8. ¿Qué diferencia práctica tiene el enfoque con merge() frente a la implementación manual con reduce() para obtener varias historias? ✅

  • [ ] merge() permite combinar más de ocho Publishers, mientras que flatMap() no.
  • [ ] merge() se limita a ocho Publishers simultáneos, mientras que reduce() puede manejar más.
  • [ ] merge() maneja errores automáticamente.
  • [ ] reduce() solo puede trabajar con tipos concretos.

9. En el método stories(), ¿por qué se usa .mapError después de .decode? ✅

  • [ ] Para transformar todos los errores en URLError.
  • [ ] Para convertir el error genérico en uno de tipo API.Error manejable. 
  • [ ] Para evitar el uso de .catch.
  • [ ] Para eliminar errores antes de continuar el pipeline.

10. ¿Cuál es la función del operador .scan([Story](), { result, story in ... }) dentro de stories()? ✅

  • [ ] Emitir solo la primera historia recibida.
  • [ ] Contar cuántas historias se han recibido.
  • [ ] Acumular y emitir un arreglo con todas las historias recibidas hasta el momento. 
  • [ ] Ordenar las historias en cada paso.

Solución

1. ¿Para qué se usa .receive(on: apiQueue) antes de .decode(...) en el pipeline?

  • [✅] Para ejecutar la decodificación en una cola concurrente y no bloquear el hilo principal.

2. ¿Por qué se usa .catch { _ in Fail(error: .invalidResponse) } en el método story(id:)?

  • [✅] Para capturar un error de tipo indeterminado y emitir un Publisher que falla inmediatamente con un error controlado.

3. ¿Cuál es la razón para usar attributes: .concurrent al crear apiQueue?

  • [✅] Permitir que se decodifiquen varias respuestas simultáneamente.

4. ¿Qué ventaja ofrece flatMap(maxPublishers:) al descargar varias historias?

  • [✅] Permite limitar cuántos Publishers se ejecutan en paralelo.

5. ¿Por qué fue necesario cambiar la firma de story(id:) para devolver AnyPublisher?

  • [✅] Porque flatMap requiere Publishers con tipos concretos, no existenciales como any Publisher.

6. ¿Cuál es la función del Empty<Story, Error> usado como semilla en la implementación con reduce?

  • [✅] Garantizar que el Publisher termine correctamente incluso si el arreglo está vacío.

7. ¿Por qué es necesario usar .eraseToAnyPublisher() al combinar Publishers con merge(with:) dentro del reduce?

  • [✅] Porque merge devuelve un Publishers.MergeMany, y se quiere un tipo uniforme (AnyPublisher).

8. ¿Qué diferencia práctica tiene el enfoque con merge() frente a la implementación manual con reduce() para obtener varias historias?

  • [✅] merge() se limita a ocho Publishers simultáneos, mientras que reduce() puede manejar más.

9. En el método stories(), ¿por qué se usa .mapError después de .decode?

  • [✅] Para convertir el error genérico en uno de tipo API.Error manejable.

10. ¿Cuál es la función del operador .scan([Story](), { result, story in ... }) dentro de stories()?

  • [✅] Acumular y emitir un arreglo con todas las historias recibidas hasta el momento.

Top comments (0)