Team: Vineet Kaul, Ajay Kumar
What are we talking about?
We are talking about streams. Streams of data originating from applications, social media, sensors, devices, websites, blah, blah, blah. The great book of Wikipedia describes streaming data as ...data that is continuously generated by different sources. Streaming data is ubiquitous and if you've never seen it in action, think again...
- Sensors in transportation vehicles, industrial equipment, and farm machinery.
- Stock market data, portfolios, and value-at-risk calculation in real-time.
- Clickstream records from digital content.
- Data about player-game interactions in online gaming.
All this sounds interesting, but managing streaming data does not come without its fair share of challenges. Broadly speaking,
- Processing: Streaming data requires real-time data pipelines for managing the flow; correlations, aggregations, filtering, and sampling.
- Storage: Business requires a view to the data, aggregated or over sliding time windows. But it is commercially prohibitive to store everything as the growth is algorithmic.
- Display: Business needs an aggregated window view but it can't be saved, so how to view it? Hence a need for a dynamic unified interface to give a view window on data.
Houston, do we have a problem?
Not exactly. There is a way to supercharge customer apps with streaming data by using the technology components that take care of the underlying complexities so developers can manage on delivering value to the business.
Business use case:
Personae:
Art Vandalay is an investor with interests in cryptocurrency trading. Art trades on various crypto exchanges across various cryptocurrencies. Art is also a customer of Chemical Bank and would prefer if his banking app would help him track cryptocurrency exchange rates.
Chemical Bank is just about to that with their app Crypto Exchange.
Cryptocurrency trade basics:
Source: Investopedia
Trading cryptocurrencies requires a currency pair. In other words, the value of one currency being quoted against the other. The first listed currency of a currency pair is called the base currency, and the second currency is called the quote currency. It indicates how much of the quote currency is needed to purchase one unit of the base currency. Currencies are identified by an ISO currency code.
The trading rates of cryptocurrencies change in real-time. To develop Crypto Exchange, Chemical Bank would need to hook up to an exchange, ingest streaming data on exchange rates of hundreds of pairs, and display it in a dynamic unified interface for Mr. Vandalay's trading pleasure.
Chemical Bank is going to do just that using Salesforce technology stack.
Architecture:
The solution takes CoinCap as a source of cryptocurrency trading data. Why? Just convenient. It could be any of many data sources over the internet streaming exchange data.
Commentary:
Mulesfot WebSockets Connector connects to the CoinCap WebSocket. The CoinCap trade WebSocket streams exchange rate data from other cryptocurrency exchange WebSockets. Because the system is designing for streaming data and not API callouts, Mulesoft WebSockets Connector opes & maintains the TCP connection between a client (Crypto Exchange) and server (CoinCap) until the app closes it.
The streaming data from any source could be overwhelming. Downstream systems might not be able to process or store data at a very high rate. The design alternative is to aggregate or throttle the data as long as it does not affect the business use case. Mulesoft Aggregators Module stores the exchange values and releases the most recent message based on the configuration (count or time). As long as the lag is acceptable, the downstream flow could be optimized. Remember, Mr. Vandalay just wants to track cryptocurrency exchange rates, and not trade through the app. The current strategy does not implement aggregation in principle but throttling. That said, the same strategy could be used to implement other use cases that require aggregation e.g. calculating the mean of exchange rates over a time duration.
Post-processing, Mulesoft sends the data for display to downstream app i.e. Crypto Exchange. As there is neither need nor technical feasibility to store data, the app would graphically display the exchange data. Mulesoft sends a Salesforce platform event to the app that is subscribed to by the UI components. The lightning:empApi component receives the event and displays it on Chart.js graphs. The use of Salesforce Platform events simplifies communication by decoupling event producers from aggregator and subscriber. In addition, it is possible for multiple apps to subscribe to a single solution.
And in three simple steps, we have an end to end flow that connects to a crypto exchange and displays data on a customer app.
Mulesoft:
The Listener: On New Inbound Message listens to the connection requests and calls the flow to open the connection through Open outbound socket.
The Aggregator: Group based aggregator collects the exchange rates based on a combination of base and quote currencies and releases the latest rate every "x" messages (where "x" could be any suitable batch size). This scheme is called Size-based aggregation. Similar to the approach, Time-based aggregation could be designed as well.
Once again, this is done to throttle the message stream so as to make it easier for downstream systems to ingest and process the exchange rate messages. Mulesoft orchestration imposes a sliding window on the data stream of which on one and the latest message is sent downstream. This chunking of data enables the architecture to accommodate customer-facing applications that are either not designed for streaming data or have platform governance limitations.
A single pair message:
{exchange=binance, base=litecoin, quote=tether, direction=buy, price=68.03, volume=12.82018, timestamp=1606478960197, priceUsd=68.14666514408307}
Aggregated message for throttle. Only the latest message in a time/count window would be passed.
[TypedValue[value: '{exchange=binance, base=litecoin, quote=tether, direction=buy, price=67.65, volume=1.16843, timestamp=1606479662239, priceUsd=67.75263835061388}', dataType: 'MapDataType{type=java.util.LinkedHashMap, keyType=SimpleDataType{type=java.lang.Object, mimeType='/'}, valueType=SimpleDataType{type=java.lang.Object, mimeType='/'}, mimeType='application/java; charset=UTF-8'}']]
The Publisher: Once the throttled messages are released, Salesforce connector publishes a Salesforce platform event to the connected Org. A custom platform event definition is described in the connected Org.
Platform events are built for handling streaming data. Platform events also allow for the processing and display of data without storing the data and hence avoiding costly DB operations.
Salesforce:
The Consumer: Display of data on dynamic graphs requires the UI component to receive the published messages. This is implemented by the lightning:empApi component that provides access to methods for subscribing to a streaming channel and listening to event messages. The lightning:empApi component uses a shared CometD connection, enabling dynamic display of streaming data on UI components.
Watch it work:
Can I get you the code? Yeah, sure.
So, you are not faint-hearted. Fair enough. We have published the code and full setup-up and test instructions on GitHub. Follow the lead.
- For the Salesforce app code and detailed instructions to set up and test the hack, refer to the repo below. This covers the full hack i.e. Salesforce & Mulesoft part.
Crypto Exchange
How to integrate you Salesforce application with streaming data from Crypto Exchange.
Part 1: Setup the Salesforce Org.
- Click on the button below to install the DriveSafe application package. The Salesforce Org should be Enterprise and above.
- Import seed data into the Org
sfdx force:data:tree:import -p data/Trade_Asset__c-plan.json -u <Org Alias>
-
Another alternative is to convert data file to CSV and upload the Trade_Asset__c.Name field by using DataLoader, Workbench or your favorite data management tool
-
Open Salesforce1 app and go to App Launcher.
- Search and select Crypto Exchange app.
- Go to trades and click on the New button.
- A form appears for adding a new trade. Select the Base Asset and Quote Asset.
- If Trade_Asset__c data has been properly uploaded, options would appear.
- Any possible…
- If you are interested in the Mulesoft project, refer to the repo below.
Have fun trying out the hack and do let us know if you face issues or general feedback.
Top comments (1)
Thanks for your submission!