TLDR, A callback is a function we pass to another, source function, in order to give the source function a way to talk back to us (call us back). Callbag is a standard for callbacks that enables working with streams. A callbag is any callback that follows that standard.
Callbacks
Take this code:
console.log(source());
When we run this code:
-
source()
is called. We wait for its output. - We log the output from
source()
.
What if source()
takes some time to produce data? Instead of waiting for it, we could tell it to "give us data" (by calling it), and give it a way to call us back when it has some data:
source(data => console.log(data));
Here, data => console.log(data)
is a callback, as its the method we provide source()
to call us back when it has data. source
is basically a source of data, and we can communicate with it as follows:
-> [GREETING] "Give me data when you have it" # --> us to source
<- [DATA] "Here is some data" # --> source to us
Streams
Now what if our source (e.g. source()
) produces an indeterminate number of data entries? For example, our source might be a function responsible for calculating the position of the cursor on X-axis, or it might be a function who is supposed to give us messages coming from a web-socket.
👉 A source that produces an indeterminate number of data entries at indeterminate time intervals is called a stream.
In this case our simplistic callback (or the communication scheme) will be rather limiting:
- We might want to tell the source to stop sending more data.
- The source might want to tell us that it won't send more data (maybe due to some error).
- Some sources push data whenever possible. Others might wait for us to ask them explicitly for more data, in which case we would need to be able to ask for more data as well.
None of these are available under our previous communication scheme, and we need an expanded communication scheme to be able to work properly with streams:
-> [GREETING] "Give me data whenever you have some" # --> us to source
<- [GREETING] "I will give you data whenever I have some. Tell me when to stop" # --> source to us
<- [DATA] "Here is some data" # --> source to us
-> [DATA] "Give me more data" # --> us to source, when it needs to be pulled
-> [END] "Stop sending more data" # --> us to source
<- [END] "I won't be sending more data (because of X)" # --> source to us
To accomodate it we can have our callback accept two arguments instead of one: The first argument denoting the type of the message, the second one denoting the content (or payload):
source((type, payload) => {
if (type === GREET) console.log('Listening ...');
if (type === DATA) console.log(payload);
if (type === END) console.log('Source ended!');
});
Callbags
Callbag is just a term to denote any callback (or function) that looks like the callback we just designed for talking with source()
. In other words, any function with the following signature is a callbag:
(type: GREET | DATA | END, payload?: any) => void;
👉 In the callbag spec, message types are denoted by numbers:
0
stands forGREET
(also calledSTART
)1
stands forDATA
2
stands forEND
.
Now lets look at the above example again:
source((type, payload) => {
if (type === GREET) console.log('Listening ...');
if (type === DATA) console.log(payload);
if (type === END) console.log('Source ended!');
});
Here, source
is NOT a callbag, since it only accepts one argument. We can fix that by making source
accept two
arguments as well, in which case our code would change like this:
source(GREET, (type, payload) => {
if (type === GREET) console.log('Listeing ...');
if (type === DATA) console.log(payload);
if (type === END) console.log('Source ended!');
});
Now what if we want to receive a limited number of data entries (say 5) from source
? We greeted source
by calling it with GREET
alongside a callbag. According to our communication scheme, source
needs to also greet us by sending us GREET
alongside a way to tell it to stop, i.e. another callbag:
let talkback;
let N = 0;
source(GREET, (type, payload) => {
if (type === GREET) {
talkback = payload; // --> when type is GREET, payload is a callbag
console.log('Listening ...');
}
if (type === DATA) {
console.log(payload); // --> when type is DATA, payload is the data sent by source
N++;
if (N >= 5) talkback(END); // --> telling the source to stop
}
if (type === END) console.log('Source ended!');
});
👉 So whenever someone greets someone (us greeting the source, the source greeting us), the payload should be another callbag, acting as a way to talk back to the greeter. In this example,
talkback
plays that role.
Callbag Sources
So far we've just worked with source()
as a stream, without looking inside it. But how would a callbag source actually look like? To see that, lets build a simple callbag source that outputs an increasing number every second:
const source = (type, payload) => {
if (type === GREET) { // --> everything starts with a GREET
let talkback = payload; // --> when greeted, the payload is a way to talk back to the greeter
let i = 0;
setInterval(() => talkback(DATA, i++), 1000); // --> lets tell the greeter about our increasing number every second
}
}
☝️ Here, we are not giving the caller any method of telling the source to stop sending more data. This is because we are not following the communication protocol properly: the source MUST greet back and provide a way of talking back (i.e. another callbg):
const source = (type, payload) => {
if (type === GREET) {
let talkback = payload;
let i = 0;
const interval = setInterval(() => talkback(DATA, i++), 1000);
talkback(GREET, (_type, _payload) => {
if (_type === END) clearInterval(interval);
});
}
}
Callbags in Practice
In practice, you rarely need to greet sources or handle talkbacks manually. Utilities such as those provided in callbag-common take care of that for you:
import { interval, pipe, map, filter, subscribe } from 'callbag-common'
const source = interval(1000) // --> emits every second
pipe(
source,
map(x => x * 3), // --> multiply by 3
filter(x => x % 2), // --> only allow odd numbers
subscribe(console.log) // --> log any incoming number
)
> 3
> 9
> 15
> 21
> 27
The workflow is typically like this:
👉 You create some callbag sources, using source factories:
import { interval } from 'callbag-common';
const source = interval(1000);
👉 You then transform these sources using operators.
For example, you might want to multiply each received number by 3
:
import { interval, map } from 'callbag-common';
let source = interval(1000);
source = map(n => n * 3)(source);
Or you might want to only pick odd numbers:
import { interval, map, filter } from 'callbag-common';
let source = interval(1000);
source = map(n => n * 3)(source);
source = filter(n => n % 2)(source);
👉 Finally, you start listening to your transformed source by subscribing to it:
import { interval, map, filter, subscribe } from 'callbag-common';
let source = interval(1000);
source = map(n => n * 3)(source);
source = filter(n => n % 2)(source);
subscribe(console.log)(source);
👉 It is also highly recommended to use the
pipe()
utility for transforming your sources and subscribing to them, as it makes the code much easier to read:
import { interval, map, filter, subscribe, pipe } from 'callbag-common';
pipe(
interval(1000),
map(n => n * 3),
filter(n => n % 2),
subscribe(console.log)
)
Top comments (0)