DEV Community

Samuel Cristobal
Samuel Cristobal

Posted on • Updated on

Reactive programming in action - part 1

Reactive programming in action - part 1

This post shows how reactive programming is used in one of DataBeacon’s central software component, called Funnel. The post is inspired by the rubber duck debugging method.

Instead of covering the basics, there are much better resources out there, the focus will be on production-ready code with real examples and description of some of the architectural decisions.

Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.

Introduction

The Funnel component sits between the Kafka topics and the SPA clients. It coordinates client connections and transforms a Kafka input stream into a web-socket connection (Socket.IO) adapted to each client status and preferences.

Kafka topics -> Funnel -> clients

Funnel is written in TypeScript and translated to Node, although currently being migrated to Go. This post focus on Node implementation, but I might cover the rationale and migration to Go in a future post.

Let’s explore Funnel in detail.

Setting up clients

After setting up the environment details the main code starts with:

const connection$ = await socketIOServer();
Enter fullscreen mode Exit fullscreen mode

Here we create up a connection$ observable of type fromSocketIO using rxjs-socket.io. For each new http request, connection$ will notify the subscribers with an object of type Connector

interface Connector<L extends EventsMap, S extends EventsMap> {
    from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
    to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
    id: string;
    user?: string;
    onDisconnect: (callback: () => void) => void;
}
Enter fullscreen mode Exit fullscreen mode

Note the first two methods, they are both a factory:

  • from takes an event name as parameter and produces an Observable from receive.

  • to takes an event name as a parameter and produces an Observer to emit.

This allows things like from('action').subscribe(to('reducer')) which could be used to manage client state remotely.

The parameters id and user are self-descriptive and onDisconnect registers a callback that will be executed upon the client’s disconnection.

Under the hood, the Socket.IO server is protected by the auth0-socketio middleware to mange authentication with the Auth0 identity provider.

This connection object can be used to monitor connections activity:

connection$.subscribe(({ id, user, onDisconnect }) => {
    log(`Connected ${user} through session ${id}`);
    onDisconnect(() => log(`Disconnected ${user} from session ${id}`));
});
Enter fullscreen mode Exit fullscreen mode

This interface is used by the function client to generate an observable of client$

const client$ = connection$.pipe(map((connector) => client(connector)));
Enter fullscreen mode Exit fullscreen mode

Each client has an observable of state$ that updates with the client state, which is implemented using React+Redux.

client$.subscribe(({ state$ }) => state$.subscribe((state) => log(util.inspect(state, { depth: 4 }))));
Enter fullscreen mode Exit fullscreen mode

Next thing we need is to attach a data source to the clients. Luckily, in addition to state$ each client implements an attachDataSource and a removeDataSource

Only one source can be attached at a time, attachDataSource expects an observable and removeDataSource is just a function that unsubscribes the client from the source updates.

That’s all we need for now, we will describe the client generation in a future post. Let’s now setup the data sources.

Getting data from source

To transform a Kafka topic to an observable we use the rxkfk library. Details of the connection are hidden inside the kafkaConnector but it returns a typed observable with the messages of a given topic(s).

const msg$ = await kafkaConnector();
Enter fullscreen mode Exit fullscreen mode

We can monitor input data with a simple subscription:

msg$.subscribe((message) => log(`Got a new message: ${message}`));
Enter fullscreen mode Exit fullscreen mode

Finally, each client should be subscribed individually. In case we want to attach all clients to the same source we should simply attach an observer to client$ to establish the link between each individual client and the msg$ observable.

client$.subscribe((client) => client.attachDataSource(msg$));
Enter fullscreen mode Exit fullscreen mode

In addition to attach data sources, clients can unsubscribe calling the client.removeDataSource() method. This allows clients to dynamically change data sources.

Coming next

So far we have covered the basic structure of the code: created two observables for client and server side and programmagically ✨ connected both.

In then next chapter we will fill the gaps and describe how clients and data sources are connected, how to create clients from connections and how data sources are filtered using a projection and combination operator.

Top comments (0)