Reactive programming in action - part 1
This post shows how reactive programming is used in one of DataBeacon’s central software component, called Funnel. The post is inspired by the rubber duck debugging method.
Instead of covering the basics, there are much better resources out there, the focus will be on production-ready code with real examples and description of some of the architectural decisions.
Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.
Introduction
The Funnel component sits between the Kafka topics and the SPA clients. It coordinates client connections and transforms a Kafka input stream into a web-socket connection (Socket.IO) adapted to each client status and preferences.
Kafka topics -> Funnel -> clients
Funnel is written in TypeScript and translated to Node, although currently being migrated to Go. This post focus on Node implementation, but I might cover the rationale and migration to Go in a future post.
Let’s explore Funnel in detail.
Setting up clients
After setting up the environment details the main code starts with:
const connection$ = await socketIOServer();
Here we create up a connection$
observable of type fromSocketIO
using rxjs-socket.io. For each new http request, connection$
will notify the subscribers with an object of type Connector
interface Connector<L extends EventsMap, S extends EventsMap> {
from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
id: string;
user?: string;
onDisconnect: (callback: () => void) => void;
}
Note the first two methods, they are both a factory:
from
takes an event name as parameter and produces anObservable
from receive.to
takes an event name as a parameter and produces anObserver
to emit.
This allows things like from('action').subscribe(to('reducer'))
which could be used to manage client state remotely.
The parameters id
and user
are self-descriptive and onDisconnect
registers a callback that will be executed upon the client’s disconnection.
Under the hood, the Socket.IO server is protected by the auth0-socketio middleware to mange authentication with the Auth0 identity provider.
This connection object can be used to monitor connections activity:
connection$.subscribe(({ id, user, onDisconnect }) => {
log(`Connected ${user} through session ${id}`);
onDisconnect(() => log(`Disconnected ${user} from session ${id}`));
});
This interface is used by the function client
to generate an observable of client$
const client$ = connection$.pipe(map((connector) => client(connector)));
Each client
has an observable of state$
that updates with the client state, which is implemented using React+Redux.
client$.subscribe(({ state$ }) => state$.subscribe((state) => log(util.inspect(state, { depth: 4 }))));
Next thing we need is to attach a data source to the clients. Luckily, in addition to state$
each client implements an attachDataSource
and a removeDataSource
Only one source can be attached at a time, attachDataSource
expects an observable and removeDataSource
is just a function that unsubscribes the client from the source updates.
That’s all we need for now, we will describe the client
generation in a future post. Let’s now setup the data sources.
Getting data from source
To transform a Kafka topic to an observable we use the rxkfk library. Details of the connection are hidden inside the kafkaConnector
but it returns a typed observable with the messages of a given topic(s).
const msg$ = await kafkaConnector();
We can monitor input data with a simple subscription:
msg$.subscribe((message) => log(`Got a new message: ${message}`));
Finally, each client should be subscribed individually. In case we want to attach all clients to the same source we should simply attach an observer to client$
to establish the link between each individual client
and the msg$
observable.
client$.subscribe((client) => client.attachDataSource(msg$));
In addition to attach data sources, clients can unsubscribe calling the client.removeDataSource()
method. This allows clients to dynamically change data sources.
Coming next
So far we have covered the basic structure of the code: created two observables for client and server side and programmagically ✨ connected both.
In then next chapter we will fill the gaps and describe how clients and data sources are connected, how to create clients
from connections
and how data sources are filtered using a projection and combination operator.
Top comments (0)