DEV Community

Shawn Adams for Rockset

Posted on • Originally published at rockset.com on

1 1

Building a Live Dashboard on Streaming Data Using Amazon Kinesis and Rockset

Authored by Haneesh Reddy Poddutoori

We live in a world where diverse systems—social networks, monitoring, stock exchanges, websites, IoT devices—all continuously generate volumes of data in the form of events. One can perform a wide variety of analyses, like aggregations, filtering, or sampling, on these event streams, either at the record level or over sliding time windows. In this blog, I will show how Rockset can serve a live dashboard, which surfaces analytics on real-time Twitter data ingested into Rockset from an Amazon Kinesis stream.

Setting up a Kinesis Stream

The Python code snippet below shows how to create a Kinesis stream programmatically. This can also be achieved through the AWS Console or the AWS CLI.

import boto3
kinesis = boto3.client('kinesis') # requires AWS credentials to be present in env
kinesis.create_stream(StreamName='twitter-stream', ShardCount=5)

Writing Tweets to Kinesis

Here, I will be using the Tweepy module to fetch tweets through a streaming search API. This API allows me to specify a list of terms that I want to include in my search (e.g. “music”, “facebook”, “apple”). You need to have a Twitter developer account in order to get access to the Twitter Streaming API. Here, I have a StreamListener, which is registered to be notified on a tweet arrival. Upon receiving a tweet, it writes the tweet data to one of the 5 random shards of the Kinesis stream.

# twitter api credentials
access_token=...
access_token_secret=...
consumer_key=...
consumer_secret=...
class TweetListener(StreamListener):
def __init__(self, stream_name):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def on_data(self, data):
record = {}
record['Data'] = data
record['PartitionKey'] = ''.join(random.choice(chars) for _ in range(size))
self.kinesis.put_records(Records=[record], StreamName=self.stream_name)
auth=OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream=Stream(auth, TweetListener("twitter-stream"))
search_terms=["music", "facebook", "apple"]
stream.filter(track=search_terms)
view raw write_tweets.py hosted with ❤ by GitHub

Connecting Kinesis to Rockset

The following snippet shows how to create a collection in Rockset, backed by a Kinesis stream. Note: You need to create an Integration (an object that represents your AWS credentials) and set up relevant permissions on the Kinesis stream, which allows Rockset to perform certain read operations on that stream.

from rockset import Client, Q, F
rs=Client(api_key=...)
aws_integration=rs.Integration.retrieve(...)
sources=[
rs.Source.kinesis(
stream_name="twitter-stream",
integration=aws_integration)]
twitter_kinesis_demo=rs.Collection.create("twitter-kinesis-demo", sources=sources)

Alternatively, collections can also be created from the Rockset console, as shown below.

console kinesis

Building the Live Dashboard

Now that I have a producer writing tweets to a Kinesis stream and a collection to ingest them into Rockset, I can build a dashboard on top of this collection. My dashboard has two views.

Tweets View

The first view displays analytics on all the tweets coming into Rockset and has 3 panels, each of which makes its own query to Rockset.

dashboard1

Live Tweets

The Live Tweets panel constantly refreshes to show the latest tweets appearing in the collection. A query is made at a fixed refresh interval to fetch tweets that were tweeted in the last minute. Here, I am selecting required fields to show on the feed and filtering out tweets older than a minute.

SELECT t.timestamp_ms,
t.created_at AS created_at,
t.text AS text,
t.user.screen_name AS screen_name
FROM "twitter-kinesis-demo" t
WHERE CAST(timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - minutes(1))
ORDER BY timestamp_ms DESC
LIMIT 100;
view raw live_tweets.sql hosted with ❤ by GitHub

Top Hashtags

The Top Hashtags panel shows trending hashtags, which were found in most number of tweets in the last hour, along with the associated tweet count. In this query, all hashtags appearing in the last one hour are filtered into a temporary relation latest_hashtags. Using a WITH clause, latest_hashtags is used it the main query, where we group by all the hashtags and order by tweet_count to obtain the trending hashtags.

WITH lastest_hashtags AS
(SELECT lower(ht.text) AS hashtag
FROM "twitter-kinesis-demo" t,
unnest(t.extended_tweet.entities.hashtags) ht
WHERE CAST(t.timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - hours(1)))
SELECT count(hashtag) AS tweet_count,
hashtag
FROM latest_hashtags
GROUP BY hashtag
ORDER BY tweet_count DESC
LIMIT 10;

Incoming Tweets

The last panel is a chart which shows the rate at which users are tweeting. We obtain data points for the number of incoming tweets every 2 seconds and plot them in a chart.

SELECT count(*)
FROM "twitter-kinesis-demo"
WHERE cast(timestamp_ms AS INT) > unix_millis(current_timestamp() - seconds(2));

Hashtags View

The second view displays analytics on tweets with a specific hashtag and also has 3 panels: Live Tweets, Related Hashtags, and Influencers. Each panel in the dashboard makes a query to Rockset. This is very similar to the first dashboard view but narrows the analytics to a selected hashtag of interest.

dashboard2

Influencers

As we have narrowed our analysis to a single hashtag, it would be interesting to see who the most influential users are around this topic. For this, we define influencers as users with the highest number of followers who are tweeting the hashtag of interest.

SELECT t.user.screen_name,
t.user.followers_count AS fc
FROM "twitter-kinesis-demo" t
WHERE 'music' IN
(SELECT hashtags.text
FROM unnest(t.entities.hashtags) hashtags)
GROUP BY (t.user.screen_name,
t.user.followers_count)
ORDER BY t.user.followers_count DESC
LIMIT 5;
view raw influencers.sql hosted with ❤ by GitHub

Related Hashtags

This section is somewhat similar to the Top Hashtags panel we saw in the Tweets view of the dashboard. It shows the hashtags that co-occur most often along with our hashtag of interest.

Live Tweets

The Live Tweets panel is very similar to one we saw in the Tweets view of the dashboard. The only difference is a new filter is applied in order to select those tweets which contain our hashtag of interest. I already used this filter for the other two panels in the Hashtags view.

Where to Go from Here

While I created this simple dashboard to illustrate how live analytics could be performed on data from Kinesis streams, Rockset supports millisecond-latency SQL that powers more complex, responsive dashboards as well.

You can refer to the full source code for this example here, if you are interested in building on streaming data using Rockset and Kinesis. Happy building!

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay