Publisher
Un emisor (Publisher) puede trasmitir cero o más valores a uno o más suscriptores, y un solo evento de fin que puede ser éxito o error. Una vez emitido el evento de fin, el emisor no puede volver a transmitir eventos.
Los API anteriores para manejar llamados asíncronos pueden adaptarse al patrón de implementación propuesto por Combine. Por ejemplo, se puede crear un emisor para una notificación del NotificationCenter como se muestra a continuación:
let myNotification = Notification.Name("MyNotification")
let center = NotificationCenter.default
let publisher = center.publisher(for: myNotification)
No obstante, tener en cuenta que el código anterior no hará nada, porque el emisor NECESITA un suscriptor.
Subscriber
Un suscriptor (Subscriber) define el comportamiento a aplicar ante la salida ("downstream" ) del emisor (Publisher).
Suscripción con sink(::)
Continuando el ejemplo anterior (de "Publisher")
let suscription = publisher
.sink { value in
print("Notification received from a publisher with value: \(value)")
}
center.post(name: myNotification, object: nil)
center.post(name: myNotification, object: nil)
suscription.cancel()
Se crea una suscripción con sink sobre el emisor (Publisher), se emite una notificación en el NotificationCenter y luego se cancela la suscripción.
La suscripción creada con sink va a recibir todos los valores emitidos por el emisor, lo que se conoce como "demanda ilimitada" ("unlimited demand").
sink también puede procesar los eventos de fin. Por ejemplo:
let just = Just("Hello world!")
_ = just
.sink(receiveCompletion: {
print("Received completion", $0)
}, receiveValue: {
print("Received value", $0)
})
_ = just
.sink(receiveCompletion: {
print("Received completion (another)", $0)
}, receiveValue: {
print("Received value (another)", $0)
})
// Received value Hello world!
// Received completion finished
// Received value (another) Hello world!
// Received completion (another) finished
Just es un emisor que emite un solo valor a cada suscriptor y luego finaliza.
Suscripción con assign(to:on:)
El operador assign(to:on:) permite asignar el valor recibido a una propiedad que admita KVO de un objeto.
class SomeObject {
var value: String = "" {
didSet {
print(value)
}
}
}
let object = SomeObject()
let publisher = ["Hello", "world!"].publisher
_ = publisher
.assign(to: \.value, on: object)
En este caso se crea un emisor que emite los elementos del arreglo ["Hello", "world!"] como una secuencia. Luego, se asigna cada elemento emitido a la propiedad value de object.
Suscripción con assign(to:)
El operador assign(to:) permite "re-emitir" el valor recibido a través de otro emisor, particularmente útil si este otro emisor es una propiedad marcada con @Published.
class SomeObject {
@Published var value = 0
}
let object = SomeObject()
object.$value
.sink {
print($0)
}
(0..<10).publisher
.assign(to: &object.$value)
Este operador es especialmente útil para no generar ciclos de retención como sí ocurre a continuación:
class MyObject {
@Published var word: String = ""
var subscriptions = Set<AnyCancellable>()
init() {
["A", "B", "C"].publisher
.assign(to: \.word, on: self)
.store(in: &subscriptions)
}
}
Cancellable
Las suscripciones (emisor + operadores + suscriptor) retornan una instancia de AnyCancellable (que conforma el protocolo Cancellable), que funciona como token para cancelar la suscripción y así eliminar los recursos cuando no se quiere recibir más valores de un emisor.
Si no se mantiene una referencia a la suscripción, esta se cancelará tan pronto el "scope" termine.
Los recursos se liberan tanto cuando se pierde la referencia en memoria al Cancellable, como si se llama el método cancel().
Entendiendo qué pasa
Publisher
Un suscriptor (Subscriber) invocará subscribe(_:) en un emisor, que a su vez invocará receive(subscriber:) para conectar el suscriptor al emisor.
public protocol Publisher {
associatedtype Output
associatedtype Failure : Error
func receive<S>(subscriber: S)
where S: Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
extension Publisher {
public func subscribe<S>(_ subscriber: S)
where S : Subscriber,
Self.Failure == S.Failure,
Self.Output == S.Input
}
Subscriber
Un emisor (Publisher) invocará receive(subscription:) en el suscriptor para darle la suscripción, receive(_:) para mandarle un nuevo valor recién publicado y receive(completion:) para informar que ya no hay más valores, sea por fin exitoso o por error.
public protocol Subscriber: CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: Subscription)
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
}
Subscription
Una suscripción (Subscription) es la conexión entre el emisor y el suscriptor. El suscriptor usa request(_:) para informar que quiere recibir más valores, hasta un número máximo o ilimitado. A esto se lo conoce como "BACKPRESSURE MANAGEMENT". Sin esto, el suscriptor podría inundarse, recibiendo más valores de los que podría procesar.
public protocol Subscription: Cancellable,
CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
subscription.request(_:) fija el número máximo de valores que el suscriptor está dispuesto a recibir. Sin embargo, este número se puede ajustar cada vez que se recibe un nuevo valor.
Creando un suscriptor personalizado
Definir un suscriptor personalizado IntSubscriber que puede recibir Int y nunca recibe errores. Se define receive(subscription:), especificando que el suscriptor puede recibir hasta tres valores en la suscripción. En receive(_ input:) se imprime cada valor recibido y se retorna .none que significa que no se va a ajustar la demanda. En receive(completion:) se imprime el evento de fin.
class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
Con el siguiente código, se crea un emisor y se le adjunta el suscriptor. Como en receive(subscription:) se limitó a tres valores, entonces el emisor solo envía los primeros tres valores y se queda esperando a que el suscriptor le pida más.
let subscriber = IntSubscriber()
let publisher = (1...6).publisher
publisher.subscribe(subscriber)
// Received value 1
// Received value 2
// Received value 3
Si, en cambio, se retorna .unlimited en receive(_ input:), se obtiene al final el mensaje "Received completion finished". Se consigue el mismo resultado al retornar .max(1) en receive(_ input:).
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .unlimited // .max(1)
}
// Received value 1
// Received value 2
// Received value 3
// Received value 4
// Received value 5
// Received value 6
// Received completion finished
Future
Just sirve para crear un emisor que emite un solo valor a un suscriptor y luego termina. Un Future hace lo mismo (producir un valor y terminar), pero de forma asíncrona.
Considerar el siguiente código:
// 1
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
Future<Int, Never> { promise in
print("Original") // 6
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
print("Terminé") // 7
promise(.success(integer + 1))
}
}
}
// 2
let future = futureIncrement(integer: 1, afterDelay: 1)
// 3
future
.sink(receiveCompletion: {
print("First", $0)
}, receiveValue: {
print("First", $0)
})
.store(in: &subscriptions) // 5
// 4
future
.sink(receiveCompletion: {
print("Second", $0)
}, receiveValue: {
print("Second", $0)
})
.store(in: &subscriptions) // 5
Aquí pasa lo siguiente:
- Existe una función que retorna un
Future. ElFutureconsiste de un closure que recibe una promesa, que recibe unResult<Output, Failure>y retornaVoid. Observar que dentro del closure hay cierto código síncronoprint("Original")y otro asíncrono (envuelto en GCD). Una vez termine la ejecución asíncrona se invoca la promesa con su valor.sucess(). - Se invoca la función
futureIncrementque crea la instancia deFuture. Esta instancia es VORAZ, lo que significa que el closure se ejecuta tan pronto como se crea la instancia, lo cual implica que (6 ) se imprime inmediatamente y (7) se imprime un segundo después, INCLUSO AUNQUE NO HAYA SUSCRIPTORES. - Se crea una suscripción con
sink(). Una vez que elFuturellame la promesa, se invocarán los métodosreceiveCompletionyreceiveValue. - Aquí se crea una segunda suscripción con
sink(). - Debido a que el código ejecutado por el
Futurees asíncono y la promesa se invoca un segundo después, se debe guardar una referencia a la suscripción en el conjuntosuscriptions.
A continuación se puede ver el resultado del programa:
Original // Inmediato
Terminé // 1 segundo después
Second 2 // 1 segundo después, ante la segunda suscripción
Second finished
First 2 // 1 segundo después, ante la primera suscripción
First finished
Se puede ver que Original y Terminé solo se imprimen una vez lo cual lleva a preguntarse: ¿acaso la segunda suscripción no debería volver a ejecutar la promesa? - No. En realidad el Future ejecuta una sola vez la promesa y emite el resultado en cada suscripción. Esto se debe a que el Future es voraz ("greedy"), lo que significa que se ejecuta tan pronto es creado. No requiere un suscriptor como los emisores regulares que son perezosos ("lazy").
Observar también que el orden de las impresiones de los mensajes no es el mismo que el de las suscripciones. Esto se debe a que no existe orden en las suscripciones, solo se las notifica a todas.
Subject
Un Subject es un tipo de dato que permite al código imperativo (que no usa Combine), conectarse con suscriptores de Combine. En otras palabras, permite enviar valores manualmente a los suscriptores.
Existe dos tipos:
PassthroughSubjectCurrentValueSubject
PassthroughSubject
- No guarda los valores emitidos
- Cada suscriptor recibe los eventos a partir del momento que se crea su suscripción.
final class StringSubscriber: Subscriber {
typealias Input = String
typealias Failure = MyError
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: String) -> Subscribers.Demand {
print("Received value (1)", input)
return input == "World" ? .max(1) : .none
}
func receive(completion: Subscribers.Completion<MyError>) {
print("Received completion (1)", completion)
}
}
enum MyError: Error {
case test
}
//
let subject = PassthroughSubject<String, MyError>()
let subscriber = StringSubscriber()
subject.subscribe(subscriber)
//
let subscription = subject
.sink(
receiveCompletion: { completion in
print("Received completion (sink)", completion)
},
receiveValue: { value in
print("Received value (sink)", value)
}
)
// 1
subject.send("Hello")
subject.send("World")
// 2
subscription.cancel()
// 3
subject.send("Still there?")
// 4
subject.send(completion: .failure(MyError.test))
subject.send(completion: .finished)
// 5
subject.send("How about another one?")
En el código anterior tenemos:
- La instancia de tipo
PassthroughSubjectemite dos eventos: "Hello" y "World". Los dos suscriptores reciben los dos eventos, así que se imprimen cuatro mensajes. - Se cancela la suscripción del
sink. - Cuando la instancia de tipo
PassthroughSubjectvuelve a emitir un evento ("Still here?"), solo el suscriptor de tipoStringSubscriberrecibe el mensaje (porque elsinkya se canceló). - Se envía un
.failureo un.finished. El suscriptor de tipoStringSubscriberimprime el mensaje que corresponda del bloque de completion . - El
Subjectde tipoPassthroughSubjectemite otro mensaje pero, en esta ocasión, nadie puede procesarlo porque ya se había emitido antes el.failureo.finished.
CurrentValueSubject
- Guarda el último valor emitido.
- Cuando se crea una nueva suscripción, el suscriptor recibe el último valor emitido.
- Se debe inicializar con un valor.
var subscriptions = Set<AnyCancellable>()
// 1
let subject = CurrentValueSubject<Int, Never>(0)
// 2
subject.send(-1)
subject
.print() // 3 // Prints log messages for all publishing events
.sink(receiveValue: { print($0) })
.store(in: &subscriptions) // 4
// 5
subject.send(1)
subject.send(2)
// 6
print(subject.value)
// send is one way to send a new value. Another way is to assign a new value to its value property
// 7
subject.value = 3
// 8
subject
.print()
.sink(receiveValue: {
print("Second subscription:", $0)
})
.store(in: &subscriptions)
// 9
subject.send(completion: .finished)
En el código anterior tenemos:
- La instancia de tipo
CurrentValueSubjecttiene como valor inicial0, que será emitida a todo nuevo suscriptor tan pronto ocurra la suscripción. - El
Subjectenvía un -1. Como no hay nadie suscrito, nadie lo recibe. Sin embargo, elSubjectseguirá almacenando este valor y se lo enviará a los próximos suscriptores tan pronto se suscriban. - Se crea una suscripción tipo
sinksobre elSubject, y se modifica con unprint()para imprimir una traza por todos los eventos publicados. - Se guarda la suscripción en la colección de suscripciones para cancelarlas fácilmente después.
- Se emiten dos números. Esto provoca que el suscriptor actual los imprima en la consola.
-
CurrentValueSubjecttambién concede acceso al último valor emitido a través del atributovalue. - Al escribir el atributo
value, se lo emite a todos los suscriptores delSubject, por lo que elsinksuscrito alSubjectimprimirá el número asignado avalue. - Otro
sinkse suscribe alSubject, e inmediatamente recibe el último número asignado avalue. - Se termina de emitir eventos en el
Subjectal llamarsend(completion: .finished). En este caso, ambos suscriptores reciben el evento.
Ajustando la demanda automáticamente
Como se mencionó antes, el "BACKPRESSURE MANAGEMENT" permite al suscriptor regular el flujo de datos del emisor. Esto se consigue en los métodos receive(subscription:), donde se establece el límite máximo inicial de elementos a recibir, y receive(_:) donde se puede modificar el flujo establecido inicialmente.
Type erasure
En ocasiones se requiere ocultar el tipo de dato PassthroughSubject o CurrentValueSubject a los distintos suscriptores. Idealmente queremos usar la abstracción Publisher; sin embargo, debido a que este es un protocolo con requerimientos genéricos (associatedtype), es necesario usar la estructura AnyPublisher que cumple la función de "type-erasure" sobre el Subject. Esto significa que al recibir la instancia de tipo AnyPublisher no se puede hacer send(:) incluso aunque Subject sí tenga este método.
let subject = PassthroughSubject<Int, Never>()
// publisher de tipo AnyPublisher y no PassthroughSubject
let publisher = subject.eraseToAnyPublisher()
publisher
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
subject.send(0)
// publisher.send(0) arroja error de compilación
Interoperabilidad entre Publishers de Combine y async/await
Toda instancia de Publisher, Future y Subject puede ser usado con la concurrencia moderna de Swift.
En el siguiente código, values retorna una secuencia asíncrona (AsyncPublisher<Self>) cuando Failure = Never, con los elementos emitidos por el Subject o el Publisher, que puede ser recorrida con un for await. Una vez que el Publisher termine (en éxito o falla), el bucle termina y la ejecución continúa en la siguiente línea.
let subject = CurrentValueSubject<Int, Never>(0)
Task {
for await element in subject.values {
print("Element: \(element)")
}
print("Completed.")
}
subject.send(1)
subject.send(99)
subject.send(completion: .finished)
En el siguiente código, Future emite un único elemento a través de la propiedad value que debe ser esperada con await.
let future = Future<Int, Never> { promise in
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
promise(.success(1))
}
}
Task {
let result = await future.value
print("result", result)
print("Completed.")
}
En caso de querer hacer alguna funcionalidad específica cuando se reciba un error (.failure) en un Suscriptor, se puede usar este código:
publisher
.sink { completion in
if case let .failure(error) = completion {
// ¡Hacer algo en caso de error!
}
} receiveValue: { hand in
...
}
Cuestionario
- ¿Qué diferencia hay entre un
PassthroughSubjecty un CurrentValueSubject? (Menciona al menos dos diferencias en su comportamiento o uso.) - Explica con tus palabras qué es el "Backpressure Management" y por qué es importante en Combine.
- ¿Qué significa que un
Futuresea "voraz" y cómo se diferencia de un Publisher común? - Describe el propósito del operador
assign(to:on:)y da un ejemplo de cuándo podría usarse en una app real. - ¿Qué función cumple
AnyPublisherdentro del contexto de "Type Erasure"? ¿Por qué podría ser útil? - Cuando un
Publisheremite un evento de finalización (.finishedo .failure): a) Puede seguir emitiendo valores. b) No puede emitir más eventos. c) Puede emitir solo si hay nuevos suscriptores. d) Se reinicia automáticamente. - El operador sink se caracteriza por: a) Permitir solo una suscripción a la vez. b) Tener demanda limitada a tres valores. c) Crear una suscripción con demanda ilimitada y closures para procesar valores y finalización. d) Cancelar automáticamente las suscripciones activas.
- ¿Qué sucede si no se guarda la referencia a un objeto AnyCancellable devuelto por una suscripción? a) La suscripción se mantiene activa hasta terminar. b) Se cancela automáticamente al salir del ámbito ("scope"). c) Se pausa temporalmente hasta reasignar la referencia. d) No afecta en nada el ciclo de vida de la suscripción.
- ¿Cuál de las siguientes afirmaciones sobre
Futurees correcta? a) Ejecuta su closure solo cuando aparece el primer suscriptor. b) Puede emitir múltiples valores a cada suscriptor. c) Ejecuta su closure inmediatamente al ser creado, incluso sin suscriptores. d) No puede ser usado conasync/await. - En el contexto de Combine, un
Subject: a) Solo recibe valores, pero no puede emitirlos. b) Permite enviar manualmente valores a los suscriptores desde código imperativo. c) Es siempre perezoso ("lazy") como los Publishers regulares. d) No puede emitir errores.
Solución
- ¿Qué diferencia hay entre un
PassthroughSubjecty unCurrentValueSubject? (Menciona al menos dos diferencias en su comportamiento o uso.) > Ambos Subject conectan una implementación imperativa a declarativa. CurrentValueSubject debe ser inicializado con un valor inicial y cada vez que recibe una nueva suscripción le envía inmediatamente el último valor emitido. PassthroughSubject no almacena estado. - Explica con tus palabras qué es el "Backpressure Management" y por qué es importante en Combine. > "Backpressure Management" consiste en que el suscriptor controla la demanda de valores del emisor, porque es posible que la velocidad de procesamiento de valores sea menor a la velocidad de emisión de los mismos.
- ¿Qué significa que un Future sea "voraz" y cómo se diferencia de un Publisher común? > El Future se ejecuta tan pronto se crea la instancia sin esperar a las suscripciones. El Publisher espera hasta que exista una suscripción para emitir sus valores.
- Describe el propósito del operador
assign(to:on:)y da un ejemplo de cuándo podría usarse en una app real. >assign(to:on:)re-emite los valores del "upstream" en una propiedad que admita KVO (to:) sobre un objeto específico (on:). Esto puede generar un ciclo de retención en caso de que el objeto referenciado sea precisamente quien retenga la referencia a la suscripción del Publisher. - ¿Qué función cumple
AnyPublisherdentro del contexto de "Type Erasure"? ¿Por qué podría ser útil? > Permite ocultar los detalles específicos del publisher. Idealmente quisiéramos usar Publisher, sin embargo, este protocolo tiene requerimientos genéricos. - Cuando un Publisher emite un evento de finalización (.finished o .failure): b) No puede emitir más eventos. ✅
- El operador
sinkse caracteriza por: c) Crear una suscripción con demanda ilimitada y closures para procesar valores y finalización. ✅ - ¿Qué sucede si no se guarda la referencia a un objeto
AnyCancellabledevuelto por una suscripción? b) Se cancela automáticamente al salir del ámbito ("scope"). ✅ - ¿Cuál de las siguientes afirmaciones sobre
Futurees correcta? c) Ejecuta su closure inmediatamente al ser creado, incluso sin suscriptores. ✅ - En el contexto de Combine, un Subject: b) Permite enviar manualmente valores a los suscriptores desde código imperativo. ✅
Top comments (0)