I recently created and published a tiny but handy project which provides a few custom user-defined functions (UDFs) to work with emojis in ksqlDB. While there is a minimum viable amount of project documentation, it's often much easier based on a little application to quickly understand how to make use of these functions and what you can build with them.
The goal is to build a simple, yet fault-tolerant and scalable stream processing backend to do near real-time tracking of emojis based on public tweets. Such an application can be used to feed a live web dashboard which, for instance, might show a continuously updated ranking of the most popular emojis derived from extracting and aggregating emoji occurrences found in tweets. It could look like the illustration below:
Thanks to ksqlDB, we can build all vital components for such an application in one coherent technology stack. Its unified streaming architecture:
- allows configuration-based data ingress and egress based on Apache Kafka Connect
- enables transformations, aggregations, and enrichments of data with a convenient SQL-like language
- and supports both, streaming queries as well as point-in-time lookups against the managed state based on materialized views
Let's get this started step-by-step...
Step 1: Data Ingestion
Directly from within ksqlDB, we can manage Apache Kafka Connect source and sink connectors. For this example, we leverage a turn-key ready Twitter source connector from the community. Before you can deploy this connector you have to get your access credentials for the Twitter API which need to be configured for the source connector. The following ksql snippet - source connector configuration in the WITH
clause - does the job and brings in public live tweets that match certain keywords:
CREATE SOURCE CONNECTOR `my-twitter-src-01` WITH (
'connector.class'='com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
'value.converter'='org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'=false,
'key.converter'='org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'=false,
'twitter.oauth.accessToken'='...',
'twitter.oauth.consumerSecret'='...',
'twitter.oauth.consumerKey'='...',
'twitter.oauth.accessTokenSecret'='...',
'kafka.status.topic'='tweets',
'process.deletes'=false,
'filter.keywords'='coronavirus,2019nCoV,SARSCoV2,covid19,cov19'
);
NOTE: You have to replace all 4 twitter.oauth.*
settings with your access credentials for this to work properly.
Shortly after the source connector starts it will produce JSON records into the configured Apache Kafka topic named tweets
. To quickly inspect the data ingestion we define our base stream reduced to the essence of the original payload like so:
-- 'create stream from tweets topic only using a few relevant payload fields'
CREATE STREAM tweets
(ID BIGINT, CREATEDAT BIGINT, TEXT VARCHAR,LANG VARCHAR,USER STRUCT<SCREENNAME VARCHAR>)
WITH (kafka_topic='tweets',value_format='JSON');
To inspect the output of this stream for a single ingested tweet we SELECT
just one record:
ksql> SELECT ID,TEXT FROM tweets EMIT CHANGES LIMIT 1;
+-------------------------------------+---------------------------------------------+
|ID |TEXT |
+-------------------------------------+---------------------------------------------+
|1244971755531354112 |RT @Faytuks: #BREAKINGπ¨ - Sweden has confirm|
| |ed 34 new #coronavirus deaths, raising the co|
| |untry's death toll to 180 with 4 028 confirme|
| |d cas⦠|
Limit Reached
Query terminated
Step 2: Data Processing
Now, for the actual stream processing on top of the ingested data, we first create a derived stream tweets_emojis
by applying our custom UDF EXTRACT_EMOJIS
. You can find a short description how to install these custom emoji handling functions into your ksql-server installation here.
-- 'create derived stream using the EMOJIS_EXTRACT function on the TEXT column'
CREATE STREAM tweets_emojis AS
SELECT EMOJIS_EXTRACT(TEXT,false) AS EMOJIS FROM tweets EMIT CHANGES;
This UDF extracts an array of potentially contained tweets from the raw TEXT
column. The 2nd parameter of type boolean allows us to choose between list (unique=false
) or set semantic (unique=true
) which decides whether or not the extracted array may contain duplicate emoji entries.
The next step is to flatten the emojis array which we can easily achieve by applying ksqlDB's built-in UDTF called EXPLODE
. Doing so will automatically skip any tweet which didn't contain emojis at all - those which are represented with an empty array in the above tweets_emojis
stream.
-- 'create flattened stream to get each contained emoji of a tweet in a separate record'
CREATE STREAM tweets_emojis_flattened AS
SELECT EXPLODE(EMOJIS) AS EMOJI FROM tweets_emojis EMIT CHANGES;
The resulting stream contains all occurring emojis as separate records:
ksql> SELECT ROWKEY,EMOJI FROM tweets_emojis_flattened EMIT CHANGES LIMIT 10;
+--------------------------------+--------------------------------+
|ROWKEY |EMOJI |
+--------------------------------+--------------------------------+
|{"Id":1244971755531354112} |π¨ |
|{"Id":1244971755996880897} |π’ |
|{"Id":1244971756122550272} |π‘ |
|{"Id":1244971757011902469} |ππ» |
|{"Id":1244971760203845632} |π½ |
|{"Id":1244971760203845632} |β£ |
|{"Id":1244971760203845632} |π·πΊ |
|{"Id":1244971760203845632} |π |
|{"Id":1244971760639827968} |β€ |
|{"Id":1244971760287731718} |π€£ |
Limit Reached
Query terminated
This stream serves as input to create a table by doing a simple aggregation to group and count the emojis:
-- 'create a table which continuously calculates the total count of each specific emoji'
CREATE TABLE tweets_emoji_counts AS
SELECT EMOJI,COUNT(*) AS TOTAL_COUNT FROM tweets_emojis_flattened
GROUP BY EMOJI EMIT CHANGES;
Step 3: Queries
ksqlDB provides two different types of queries against tables:
Pull Queries
Pull queries can be used to do point-in-time lookups against the continuously updated state which is represented by the table. We can retrieve the current total counter of any specific emoji by specifying it as the ROWKEY
in the WHERE
clause:
-- 'pull query to retrieve single point-in-time emoji count'
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts WHERE ROWKEY = 'π';
ksql> SELECT EMOJI,TOTAL_COUNT FROM
tweets_emji_counts WHERE ROWKEY = 'π';
+--------------------+--------------------+
|EMOJI |TOTAL_COUNT |
+--------------------+--------------------+
|π |20 |
Query terminated
Push Queries
Push queries are meant to run indefinitely and emit results every time there are changes in the underlying data stream or updates of the table state respectively. We can subscribe to a continuously updated total counter stream of a single, several specific or all emojis:
-- 'PUSH query: get change stream for single emoji counter'
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = 'π' EMIT CHANGES;
ksql> SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = 'π' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI |TOTAL_COUNT |
+---------------------+---------------------+
|π |55 |
|π |56 |
|π |59 |
|π |61 |
^CQuery terminated
-- 'PUSH query: change stream for several emoji counters - there is no IN(...) yet :('
SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = 'π' OR emoji = 'π¨' OR emoji = 'π' EMIT CHANGES;
ksql> SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts
WHERE emoji = 'π' OR emoji = 'π¨' OR emoji = 'π' EMIT CHANGES;
+---------------------+---------------------+
|EMOJI |TOTAL_COUNT |
+---------------------+---------------------+
|π |48 |
|π |66 |
|π¨ |109 |
|π |49 |
|π¨ |111 |
|π |50 |
|π¨ |112 |
|π |67 |
^CQuery terminated
Step 4: Serving Data to Clients
Thanks to its HTTP / REST API, ksqlDB lets us directly expose both, push and pull queries to consuming clients. For instance, we can run the needed queries to build a live dashboard in which we visualize the data streams and offer end-users a simple form-based query mechanism. Below is a cURL snippet, which when run against the REST API, is showing the output of a PULL
query to retrieve the current counter for the π emoji:
curl -s -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT EMOJI,TOTAL_COUNT FROM tweets_emoji_counts WHERE ROWKEY = \'π\';",
"streamsProperties": {}
}' | jq '.[1] | {emoji: .row.columns[0],count: .row.columns[1]}'
which for my particular data results in:
{
"emoji": "π",
"count": 2198
}
Top comments (0)