DEV Community

Cover image for Quick tip: Streaming data from MongoDB Atlas to SingleStore Kai using Kafka and CDC
Akmal Chaudhri for SingleStore

Posted on • Updated on

Quick tip: Streaming data from MongoDB Atlas to SingleStore Kai using Kafka and CDC

Abstract

SingleStore provides a Change Data Capture (CDC) solution, currently in preview, to stream data from MongoDB to SingleStore Kai. In this article, we'll see how to connect an Apache Kafka broker to MongoDB Atlas and then stream the data from MongoDB Atlas to SingleStore Kai using the CDC solution. We'll also use Metabase to create a simple analytics dashboard for SingleStore Kai.

The notebook file used in this article is available on GitHub.

Introduction

CDC is a way to keep track of changes that happen in a database or a system. SingleStore now provides a CDC solution, currently in preview, that works with MongoDB.

To demonstrate the CDC solution, we'll use a Kafka broker to stream data to a MongoDB Atlas cluster and then use the CDC pipeline to propagate the data from MongoDB Atlas to SingleStore Kai. We'll also create a simple analytics dashboard using Metabase.

Figure 1 shows the high-level architecture of our system.

Figure 1. High-Level Architecture (Source: SingleStore).

Figure 1. High-Level Architecture (Source: SingleStore).

We'll focus on other scenarios using the CDC solution in future articles.

MongoDB Atlas

We'll use MongoDB Atlas in an M0 Sandbox. We'll configure an admin user with atlasAdmin privileges under Database Access. We'll temporarily allow access from anywhere (IP Address 0.0.0.0/0) under Network Access. We'll note down the username, password and host.

Apache Kafka

We'll configure a Kafka broker to stream data into MongoDB Atlas. We'll use a Jupyter notebook to achieve this.

First, we'll install some libraries:

!pip install pymongo kafka-python --quiet
Enter fullscreen mode Exit fullscreen mode

Next, we'll connect to MongoDB Atlas and the Kafka broker:

try:
    client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")
    db = client["adtech"]
    client.drop_database(db)
    print("Connected to MongoDB successfully")
except Exception as e:
    print(f"Could not connect to MongoDB: '{e}'")

try:
    consumer = KafkaConsumer(
        "ad_events",
        bootstrap_servers = ["public-kafka.memcompute.com:9092"]
    )
    print("Connected to Kafka consumer successfully")
except Exception as e:
    print(f"Could not connect to Kafka: '{e}'")
Enter fullscreen mode Exit fullscreen mode

We'll replace <username>, <password> and <host> with the values that we saved earlier from MongoDB Atlas.

Initially, we'll load 100 records into MongoDB Atlas, as follows:

MAX_ITERATIONS = 100
BATCH_SIZE = 10

buffer = []

for iteration, message in enumerate(consumer, start = 1):
    if iteration > MAX_ITERATIONS:
        break

    try:
        record = message.value.decode("utf-8")
        fields = list(map(str.strip, record.split("\t")))

        if len(fields) == 9:
            user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = fields

            events_record = {
                "user_id": int(user_id),
                "event_name": event_name,
                "advertiser": advertiser,
                "campaign": int(campaign.split()[0]),
                "gender": gender,
                "income": income,
                "page_url": page_url,
                "region": region,
                "country": country
            }

            buffer.append(events_record)

        if len(buffer) >= BATCH_SIZE:
            db.events.insert_many(buffer)
            buffer.clear()

    except Exception as e:
        print(f"Iteration {iteration}: Could not process data - {str(e)}")

if buffer:
    db.events.insert_many(buffer)
Enter fullscreen mode Exit fullscreen mode

The data should load successfully and we should see a database called adtech with a collection called events. Documents in the collection should be similar in structure to the following example:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
Enter fullscreen mode Exit fullscreen mode

These documents represent Ad Campaign events. The events collection stores details of the advertiser, campaign and various demographic information about the user, such as gender and income.

SingleStore Kai

A previous article showed the steps to create a free SingleStoreDB Cloud account. We'll use the following settings:

  • Workspace Group Name: CDC Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: cdc-demo
  • Size: S-00
  • Settings:
    • SingleStore Kai selected

Once the workspace is available, we'll make a note of our password and host. The host will be available from cdc-demo > Connect > SQL IDE > Host. We'll need this information later for Metabase. We'll also temporarily allow access from anywhere by configuring the firewall under CDC Demo Group > Firewall.

From the left navigation pane, we'll select DEVELOP > Data Studio > Open SQL Editor to create a adtech database and link, as follows:

DROP DATABASE IF EXISTS adtech;
CREATE DATABASE IF NOT EXISTS adtech;

USE adtech;

DROP LINK adtech.link;

CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017",
        "collection.include.list": "adtech.*",
        "mongodb.ssl.enabled": "true",
        "mongodb.authsource": "admin",
        "mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>",
            "mongodb.password": "<password>"}';

CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
Enter fullscreen mode Exit fullscreen mode

We'll replace <username> and <password> with the values that we saved earlier from MongoDB Atlas. We'll also need to replace the values for <primary>, <secondary> and <secondary> with the full address for each from MongoDB Atlas.

We'll now check for any tables, as follows:

SHOW TABLES;
Enter fullscreen mode Exit fullscreen mode

This should show one table called events:

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+
Enter fullscreen mode Exit fullscreen mode

We'll check the structure of the table:

DESCRIBE events;
Enter fullscreen mode Exit fullscreen mode

The output should be as follows:

+-------+----------+------+------+---------+----------+
| Field | Type     | Null | Key  | Default | Extra    |
+-------+----------+------+------+---------+----------+
| _id   | bson     | NO   |      | NULL    |          |
| _more | bson     | NO   |      | NULL    |          |
| $_id  | longblob | NO   | PRI  | NULL    | computed |
+-------+----------+------+------+---------+----------+
Enter fullscreen mode Exit fullscreen mode

Next, we'll check for any pipelines:

SHOW PIPELINES;
Enter fullscreen mode Exit fullscreen mode

This will show one pipeline called events that is currently Stopped:

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| adtech.events       | Stopped | False     |
+---------------------+---------+-----------+
Enter fullscreen mode Exit fullscreen mode

Now we'll start the events pipeline:

START ALL PIPELINES;
Enter fullscreen mode Exit fullscreen mode

and the state should change to Running:

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| adtech.events       | Running | False     |
+---------------------+---------+-----------+
Enter fullscreen mode Exit fullscreen mode

If we now run the following command:

SELECT COUNT(*) FROM events;
Enter fullscreen mode Exit fullscreen mode

it should return 100 as the result:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+
Enter fullscreen mode Exit fullscreen mode

We'll check one row in the events table, as follows:

SELECT _id :> JSON AS _id, _more :> JSON AS _more FROM events LIMIT 1;
Enter fullscreen mode Exit fullscreen mode

The output should be similar to the following:

+-------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                 | _more                                                                                                                                                                                                      |
+-------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid":"66f671eff41356679e24336c"} | {"advertiser":"Starbucks","campaign":13,"country":"CA","event_name":"Click","gender":"Male","income":"unknown","page_url":"/2014/07/balloon-arch-tutorial.html/8/","region":"Alberta","user_id":164271946} |
+-------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

The CDC solution has successfully connected to MongoDB Atlas and replicated all 100 records to SingleStore Kai.

Let's now create a dashboard using Metabase.

Metabase

Details of how to install, configure and create a connection to Metabase were described in a previous article. We'll create visualisations using slight variations of the queries used in the earlier article.

1. Total Number of Events

SELECT COUNT(*) FROM events;
Enter fullscreen mode Exit fullscreen mode

2. Events by Region

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;
Enter fullscreen mode Exit fullscreen mode

3. Events by Top 5 Advertisers

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;
Enter fullscreen mode Exit fullscreen mode

4. Ad Visitors by Gender and Income

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE
        WHEN xx.z___min_rank = xx.z___rank THEN 1
        ELSE 0
      END AS z__is_highest_ranked_cell
    FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank
        FROM (SELECT *, RANK() OVER (ORDER BY CASE
                WHEN bb.z__pivot_col_rank = 1 THEN (CASE
                    WHEN bb.`events.count` IS NOT NULL THEN 0
                    ELSE 1
                  END)
                ELSE 2
              END, CASE
                WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`
                ELSE NULL
              END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank
            FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE
                    WHEN ww.`events.gender` IS NULL THEN 1
                    ELSE 0
                  END, ww.`events.gender`) AS z__pivot_col_rank
                FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`
                    FROM adtech.events AS events
                    WHERE (_more::income <> 'unknown' OR _more::income IS NULL)
                    GROUP BY 1, 2) ww) bb
            WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;
Enter fullscreen mode Exit fullscreen mode

Figure 2 shows an example of the charts sized and positioned on the AdTech dashboard. We'll set the auto-refresh option to 1 minute.

Figure 2. Final Dashboard.

Figure 2. Final Dashboard.

If we load more data into MongoDB Atlas using the Jupyter notebook by changing MAX_ITERATIONS, we'll see the data propagated to SingleStore Kai and the new data reflected in the AdTech dashboard.

Summary

In this article, we created a CDC pipeline to augment MongoDB Atlas with SingleStore Kai. SingleStore Kai can be used for analytics due to its far superior performance, as highlighted by several benchmarks. We also used Metabase to create a quick visual dashboard to help us gain insights into our Ad Campaign.

Top comments (0)