<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Samuel Cristobal</title>
    <description>The latest articles on DEV Community by Samuel Cristobal (@typesamuel).</description>
    <link>https://dev.to/typesamuel</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F547196%2F2af812bb-3807-490a-85c9-ed0b4d259704.jpeg</url>
      <title>DEV Community: Samuel Cristobal</title>
      <link>https://dev.to/typesamuel</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/typesamuel"/>
    <language>en</language>
    <item>
      <title>Reactive programming in action - part 1</title>
      <dc:creator>Samuel Cristobal</dc:creator>
      <pubDate>Sat, 02 Apr 2022 17:57:47 +0000</pubDate>
      <link>https://dev.to/typesamuel/reactive-programming-in-action-part-1-h75</link>
      <guid>https://dev.to/typesamuel/reactive-programming-in-action-part-1-h75</guid>
      <description>&lt;h1&gt;
  
  
  Reactive programming in action - part 1
&lt;/h1&gt;

&lt;p&gt;This post shows how &lt;a href="https://reactivex.io/"&gt;reactive programming&lt;/a&gt; is used in one of &lt;a href="//www.databeacon.aero"&gt;DataBeacon&lt;/a&gt;’s central software component, called &lt;em&gt;Funnel&lt;/em&gt;. The post is inspired by the &lt;a href="https://rubberduckdebugging.com/"&gt;rubber duck debugging method&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Instead of covering the basics, there are much better resources out there, the focus will be on production-ready code with real examples and description of some of the architectural decisions.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;The &lt;em&gt;Funnel&lt;/em&gt; component sits between the &lt;a href="https://kafka.apache.org/"&gt;Kafka&lt;/a&gt; topics and the &lt;a href="https://developer.mozilla.org/en-US/docs/Glossary/SPA"&gt;SPA&lt;/a&gt; clients. It coordinates client connections and transforms a Kafka input stream into a web-socket connection (Socket.IO) adapted to each client status and preferences.&lt;/p&gt;

&lt;p&gt;Kafka topics -&amp;gt; Funnel -&amp;gt; clients&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Funnel&lt;/em&gt; is written in &lt;a href="https://www.typescriptlang.org/"&gt;TypeScript&lt;/a&gt; and translated to &lt;a href="https://nodejs.org/en/"&gt;Node&lt;/a&gt;, although currently being migrated to &lt;a href="https://go.dev/"&gt;Go&lt;/a&gt;. This post focus on Node implementation, but I might cover the rationale and migration to Go in a future post.&lt;/p&gt;

&lt;p&gt;Let’s explore &lt;em&gt;Funnel&lt;/em&gt; in detail.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setting up clients
&lt;/h2&gt;

&lt;p&gt;After setting up the environment details the main code starts with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;connection$&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;socketIOServer&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here we create up a &lt;code&gt;connection$&lt;/code&gt; observable of type &lt;code&gt;fromSocketIO&lt;/code&gt; using &lt;a href="https://github.com/scristobal/rxjs-socket.io"&gt;rxjs-socket.io&lt;/a&gt;. For each new http request, &lt;code&gt;connection$&lt;/code&gt; will notify the subscribers with an object of type &lt;code&gt;Connector&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kr"&gt;interface&lt;/span&gt; &lt;span class="nx"&gt;Connector&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;L&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;EventsMap&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;S&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;EventsMap&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="na"&gt;from&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;Ev&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;EventNames&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;L&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="na"&gt;eventName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;Ev&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;Observable&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;EventParam&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;L&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;Ev&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nl"&gt;to&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;Ev&lt;/span&gt; &lt;span class="kd"&gt;extends&lt;/span&gt; &lt;span class="nx"&gt;EventNames&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;S&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="na"&gt;eventName&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nx"&gt;Ev&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;Observer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;Parameters&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nx"&gt;S&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nx"&gt;Ev&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nl"&gt;id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nl"&gt;user&lt;/span&gt;&lt;span class="p"&gt;?:&lt;/span&gt; &lt;span class="kr"&gt;string&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="nl"&gt;onDisconnect&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="na"&gt;callback&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Note the first two methods, they are both a factory:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;from&lt;/code&gt; takes an event name as parameter and produces an &lt;code&gt;Observable&lt;/code&gt; from &lt;a href="https://socket.io/docs/v4/server-api/#socketoneventname-callback"&gt;receive&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;code&gt;to&lt;/code&gt; takes an event name as a parameter and produces an &lt;code&gt;Observer&lt;/code&gt; to &lt;a href="https://socket.io/docs/v4/server-api/#socketemiteventname-args"&gt;emit&lt;/a&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This allows things like &lt;code&gt;from('action').subscribe(to('reducer'))&lt;/code&gt; which could be used to manage client state remotely.&lt;/p&gt;

&lt;p&gt;The parameters &lt;code&gt;id&lt;/code&gt; and &lt;code&gt;user&lt;/code&gt; are self-descriptive and &lt;code&gt;onDisconnect&lt;/code&gt; registers a callback that will be executed upon the client’s disconnection.&lt;/p&gt;

&lt;p&gt;Under the hood, the Socket.IO server is protected by the &lt;a href="https://www.npmjs.com/package/auth0-socketio"&gt;auth0-socketio&lt;/a&gt; middleware to mange authentication with the Auth0 identity provider.&lt;/p&gt;

&lt;p&gt;This connection object can be used to monitor connections activity:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;connection$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(({&lt;/span&gt; &lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;user&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nx"&gt;onDisconnect&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Connected &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;user&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; through session &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="nx"&gt;onDisconnect&lt;/span&gt;&lt;span class="p"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Disconnected &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;user&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; from session &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;id&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
&lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This interface is used by the function &lt;code&gt;client&lt;/code&gt; to generate an observable of &lt;code&gt;client$&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;client$&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nx"&gt;connection$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;pipe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;map&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;connector&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;connector&lt;/span&gt;&lt;span class="p"&gt;)));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each &lt;code&gt;client&lt;/code&gt; has an observable of &lt;code&gt;state$&lt;/code&gt; that updates with the client state, which is implemented using React+Redux.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;client$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;(({&lt;/span&gt; &lt;span class="nx"&gt;state$&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;state$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;state&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;util&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;inspect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;state&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="na"&gt;depth&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt; &lt;span class="p"&gt;}))));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next thing we need is to attach a data source to the clients. Luckily, in addition to &lt;code&gt;state$&lt;/code&gt; each client implements an &lt;code&gt;attachDataSource&lt;/code&gt; and a &lt;code&gt;removeDataSource&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Only one source can be attached at a time, &lt;code&gt;attachDataSource&lt;/code&gt; expects an observable and &lt;code&gt;removeDataSource&lt;/code&gt; is just a function that unsubscribes the client from the source updates.&lt;/p&gt;

&lt;p&gt;That’s all we need for now, we will describe the &lt;code&gt;client&lt;/code&gt; generation in a future post. Let’s now setup the data sources.&lt;/p&gt;

&lt;h2&gt;
  
  
  Getting data from source
&lt;/h2&gt;

&lt;p&gt;To transform a Kafka topic to an observable we use the &lt;a href="https://www.npmjs.com/package/rxjs-kafka"&gt;rxkfk&lt;/a&gt; library. Details of the connection are hidden inside the &lt;code&gt;kafkaConnector&lt;/code&gt; but it returns a typed observable with the messages of a given topic(s).&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="kd"&gt;const&lt;/span&gt; &lt;span class="nx"&gt;msg$&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nx"&gt;kafkaConnector&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can monitor input data with a simple subscription:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;msg$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;log&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s2"&gt;`Got a new message: &lt;/span&gt;&lt;span class="p"&gt;${&lt;/span&gt;&lt;span class="nx"&gt;message&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;`&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Finally, each client should be subscribed individually. In case we want to attach all clients to the same source we should simply attach an observer to &lt;code&gt;client$&lt;/code&gt; to establish the link between each individual &lt;code&gt;client&lt;/code&gt; and the &lt;code&gt;msg$&lt;/code&gt; observable.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight typescript"&gt;&lt;code&gt;&lt;span class="nx"&gt;client$&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;((&lt;/span&gt;&lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="nx"&gt;client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nx"&gt;attachDataSource&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nx"&gt;msg$&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In addition to attach data sources, clients can unsubscribe calling the &lt;code&gt;client.removeDataSource()&lt;/code&gt; method. This allows clients to dynamically change data sources.&lt;/p&gt;

&lt;h2&gt;
  
  
  Coming next
&lt;/h2&gt;

&lt;p&gt;So far we have covered the basic structure of the code: created two observables for client and server side and &lt;em&gt;programmagically&lt;/em&gt; ✨ connected both.&lt;/p&gt;

&lt;p&gt;In then next chapter we will fill the gaps and describe how clients and data sources are connected, how to create &lt;code&gt;clients&lt;/code&gt; from &lt;code&gt;connections&lt;/code&gt; and how data sources are filtered using a &lt;a href="https://rxjs.dev/api/index/function/combineLatest"&gt;projection and combination operator&lt;/a&gt;.&lt;/p&gt;

</description>
      <category>rxjs</category>
      <category>typescript</category>
      <category>kafka</category>
      <category>socketio</category>
    </item>
  </channel>
</rss>
