The task is to implement an Observable.
The bolierplate code
class Observable {
constructor(setup) {
}
subscribe(subscriber) {
}
}
An Observable has two functions - the setup function that creates values and the subscriber function that receives values. The constructor should enforce that an Observable produces values and stores the logic for later execution.
constructor(setup) {
if (typeof setup !== "function") {
throw new Error("Observable constructor requires a function");
}
this._setup = setup;
}
Calling subscribe starts the execution. It creates a new execution and allows multiple executions of the same observable.
subscribe(subscriber) {
The observer should always be an object observer
const observer = typeof subscriber === "function" ? { next: subscriber} : subscriber || {};
The state is controlled. Once an Observable errors, completes, or is unsubscribed, no more values are allowed
let closed = false;
Declare cleanup early, because the setup function may call observer.complete() or error() before setup finishes
let cleanup = () => {};
Ensure that there are no emissions after cleanup, no emissions after error, and the cleanup runs only once
const safeObserver = {
next: (value) => {
if(!closed && observer.next) {
observer.next(value)
}
},
error: (err) => {
if(!closed) {
closed = true;
observer.error?.(err);
cleanup()
}
},
complete: () => {
if(!closed) {
closed = true;
observer.complete?.();
cleanup()
}
}
}
Run the setup function
const result = this._setup(safeObserver);
if (typeof result === "function") {
cleanup = result;
}
Return the subscription
return {
unsubscribe() {
if (!closed) {
closed = true;
cleanup();
}
}
};
This ensures the cleanup runs once. The final code
class Observable {
constructor(setup) {
if(typeof setup !== "function") {
throw new Error("Observable constructor requires a function");
}
this._setup = setup;
}
subscribe(subscriber) {
const observer = typeof subscriber === "function" ? { next: subscriber} : subscriber || {};
let closed = false;
let cleanup = () => {};
const safeObserver = {
next: (value) => {
if(!closed && observer.next) {
observer.next(value)
}
},
error: (err) => {
if(!closed) {
closed = true;
observer.error?.(err);
cleanup()
}
},
complete: () => {
if(!closed) {
closed = true;
observer.complete?.();
cleanup()
}
}
}
const result = this._setup(safeObserver);
if (typeof result === "function") {
cleanup = result;
}
return {
unsubscribe() {
if(!closed) {
closed = true;
cleanup();
}
}
}
}
}
That's all folks!
Top comments (0)