DEV Community

Yet another end-to-end streaming dashboarding example

Introduction

In this post, we present an introductory example using Apache Pinot to ingest an Apache Kafka stream. This is an introductory post that builds upon existing Apache Pinot material from the official trainings and documentation. The purpose here is not just to rehash what is in the official docs, but a preparation for a second part. The idea, is to adapt the official examples to this end. Moreover, when I tried to run these examples, I had some extra ideas in how to better present the material. Part of the presented setup is also based on yet another Apache Pinot example in a complementary series of lectures that is written for Javascript. Our focus here is Python. Here are the two references I used

  1. Lecture 4 (https://github.com/startreedata/learn/tree/main/pinot-advanced/04-stream-ingestion). It is a series of advanced Pinot usage from Startree. I Ported the JS example to Python.
  2. Updated continuously Streamlit example

Another purpose of this introduction is to document my learning process so as to use it later as a reference or personal notes. Consequently the coherence of the material presented is of paramount importance.

For a formal introduction to Apache Pinot, the excellent playlists below are highly recommended.

Apache Pinot 101
Apache Pinot 201

Let's start our journey.

Booting up setup and running our first streaming session

Our setup is completely local. We will use exclusively Podman. All the executions are done on Windows 11 using Command Prompt terminals under VScodium. You might need to apply some minor changes for your environment (if any).

The docker compose file is mostly covered here . We just added a .env file for convenience.

podman compose up -d
Enter fullscreen mode Exit fullscreen mode

This starts an Apache Kafka single-node cluster and an Apache Pinot cluster with one Controller, one Broker and one Server nodes. More on this later. You can visit the Apache Pinot Controller UI here.
Having started Apache Kafka and Apache Pinot we need to push some data to Apache Kafka and link Apache Pinot to Apache Kafka through a streaming table. As in both references, we will use Wikipedia page edits event stream as a data source. Every page edit on Wikipedia is recorded as a event. There are many page edits throughout the world in an ever increasing body of knowledge on Wikipedia. This happens, literally continuously and such activity can be modeled as an event source. This event is made public in the following url https://stream.wikimedia.org/v2/stream/recentchange and people can visit it with their browser and see these events. Obviously, the typical web surfer is not interested in this overwhelming, ever growing list of repetitive JSON context. It is so large that one has to resort to Data Analytics methods, so as to make sense. Moreover, this event stream is not structured in a way to convey meaning as a typical web page. On the contrary, methods of Data Engineering are necessary to capture it in a streaming table (Apache Spark terminology is used here), do whatever data transformations are necessary and then make it available to a Data Analytics system for visualizing the different aspects.
First, we need to understand the data source. The data source is delivered in what is commonly referred to as SSE format. Wikipedia, unsurprisingly has a very detailed page with documentation on this. It also lists various code snippets on how to consume it. In terms of Data Engineering,

is a web service that exposes continuous streams of structured event data. It does so over HTTP.

For a Data Engineer, a source transport format is half the story. The rest is the schema. It is available here.

In terms of software development, this means, that we need a client library. There are many, but SSE client stands out. It is also used in the Streamlit tutorial of Apache Pinot. For simplicity, we will use the Wikipedia approach.

Here is the adapted code from Wikipedia.

url = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {"User-Agent": "advanced_pinot_tutorial"}

with EventSource(url, headers=headers) as stream:
    for event in stream:
         if event.type == 'message':
            try:
                change = json.loads(event.data)
                change['ts'] = change['timestamp'] * 1000
                del change['timestamp']

                # Kafka Place holder Code is here

            except ValueError:
                pass
Enter fullscreen mode Exit fullscreen mode

From the schema what stands out for a streaming source is the timestamp

timestamp:
description: Unix timestamp (derived from rc_timestamp).
type: integer
maximum: 9007199254740991
minimum: -9007199254740991

The above conversion is to avoid a conflict with any internal timestamp function. Also we convert the Unix timestamp to milliseconds. Keep it in mind.

Now we need some code to push to an Apache Kafka topic. We use the confluent-kafka library.

First we setup our Apache Kafka connection (we implicitly assume the default 9092 port for the Apache Kafka), which is petty much self-explanatory

kafka_topic_name = "wikipedia-events"

# conf = {'bootstrap.servers': 'redpanda-0,redpanda-1,redpanda-2'}
conf = {'bootstrap.servers': 'kafka'}

kafka_admin = admin.AdminClient(conf)

kafka_admin.delete_topics([kafka_topic_name])
kafka_admin.create_topics([admin.NewTopic(kafka_topic_name, 1, 1)])

producer = Producer(conf)
Enter fullscreen mode Exit fullscreen mode

and then in the Apache Kafka placeholder in the previous snippet we put the push logic

producer.poll(0)
producer.produce(kafka_topic_name, key=change["meta"]["id"], value=json.dumps(change), callback=acked)

events_processed += 1
if events_processed == 100:
    print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events")
    producer.flush()
    events_processed = 0  
Enter fullscreen mode Exit fullscreen mode

every 100 events, we log the push of the batch. Confluent has very good documentation on how this library is used.

We pack the application a Docker image

podman build -t pinot-advanced/python-streaming-ingest ./producer-app
Enter fullscreen mode Exit fullscreen mode

and then, we run it

podman run -it  --network=pinot-advanced pinot-advanced/python-streaming-ingest:latest
Enter fullscreen mode Exit fullscreen mode

Producer app executionn

Now it is time to verify the Apache Kafka push is working appropriately. For convenience a consumer Python app is provided. You can start it with similar commands

podman build -t pinot-advanced/python-kafka-consumer ./consumer-app
podman run -it  --network=pinot-advanced pinot-advanced/python-kafka-consumer:latest
Enter fullscreen mode Exit fullscreen mode

Consumer app execution

Everything seems to work fine.

Setting up Apache Pinot and running our first query

In order to create the streaming table, we need to tell Apache Pinot both the transport format and the schema. The schema need not be exhaustive, but include a subset of what we need. For this reason we need two files.

The schema file.

Each column in Apache Pinot has one of the following types.

  • Dimension
  • Metric
  • Date/Time

It is pretty obvious what the last one is used for. The first one is for filtering (used for drilling down). The second one is for aggregations. This distinction does not exist in relational databases or other Big Data solutions, and is what makes Apache Pinot a true Big Data streaming solution.

We will not need any metric fields, since we get a stream of data edits. We will do what people call distinctCounts which in reality is an aggregation, but the fields we will use are not numeric and so, they cannot go to the metric fields section. Here you are

{
  "schemaName": "wikievents",
  "dimensionFieldSpecs": [
    {
      "name": "metaJson",
      "dataType": "STRING"
    },
    {
      "name": "user",
      "dataType": "STRING"
    },
    {
      "name": "domain",
      "dataType": "STRING"
    },
    {
      "name": "topic",
      "dataType": "STRING"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "ts",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

The config file.

Next one is the table configuration and transport format. See https://github.com/fithisux/visualize-streamlit-pinot-example/blob/main/scripts/wikipedia_events_realtime_table_config.json for the details.

I will just focus on this snippet

{
    "transformConfigs": [
      {
        "columnName": "domain",
        "transformFunction": "JSONPATH(metaJson, '$.domain')"
      },
      {
        "columnName": "topic",
        "transformFunction": "JSONPATH(metaJson, '$.topic')"
      }
    ]
  },
Enter fullscreen mode Exit fullscreen mode

It is necessary, so as to grab the fields from the JSON payload of the Apache Kafka message. So, fields topic and domain are computed fields, and for this reason we need explicitly expose the metaJson column.

Our first query

With the compose file and streamer app up and running we will construct our table in Apache Pinot.

podman run -it --network=pinot-advanced -v ./scripts/wikipedia_events_schema.json:/scripts/wikipedia_events_schema.json -v ./scripts/wikipedia_events_realtime_table_config.json:/scripts/wikipedia_events_realtime_table_config.json apachepinot/pinot:latest-25-ms-openjdk AddTable -schemaFile /scripts/wikipedia_events_schema.json -tableConfigFile /scripts/wikipedia_events_realtime_table_config.json -controllerHost pinot-controller -exec
Enter fullscreen mode Exit fullscreen mode

We mount ./scripts on a purpose built container that will use schema and table config in order to create the table.

You can view the table by navigating to Pinot Controller locally here and run your first query

select domain, topic, user, ts from wikievents limit 10;
Enter fullscreen mode Exit fullscreen mode

Here is a sample of what you should expect

Sample query execution

Running the dashboard

Deviating from the sample Streamlit app provided by Startree, but similar in spirit we provide a Dashboard. Before delving into the code base let's clarify the business logic of the dashboard. We run a sampling query that works on a window from the sampling time, 1 minute back into the past. In this window we sample three important quantities:

  1. The number of changes that happened
  2. The different users that committed these changes
  3. The different domains where this change took place.

Our dashboard will carry the current sample, and a window back in time of the 30 latest samples. For visualization we will will record the sample, and we will plot the 30 samples buffer as a visual summary. Our dashboard will be implemented with the Panel python package in a notebook. Is used VScodium for convenience. It is advised to create a virtual environment, install the dependencies there and then use it as a kernel for executing the notebook.

VScodium setup

How is the sample obtained is just an Apache Pinot query away:

select 
   count(*) AS events1Min,
   distinctcount(user) AS users1Min,
   distinctcount(domain) AS domains1Min
from wikievents_REALTIME
where ts > ago('PT1M')
limit 1;
Enter fullscreen mode Exit fullscreen mode

ago function uses ISO 8601 duration format to construct a bound for the window.

This is our main building block. To implement our sampling logic here is the relevant notebook cell

from pinotdb import connect
import pandas as pd

conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')

list_of_samples = []

def get_changes():
    query = """
        select 
                count(*) AS events1Min,
                distinctcount(user) AS users1Min,
                distinctcount(domain) AS domains1Min
        from wikievents_REALTIME
        where ts > ago('PT1M')
        limit 1;
    """

    curs = conn.cursor()

    curs.execute(query)

    temp_df = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
    temp_df['sample_time'] = pd.Timestamp.now()

    list_of_samples.append(temp_df)
    if len(list_of_samples) > 30:
        list_of_samples.pop(0)

    return temp_df.to_dict('records')[0], pd.concat(list_of_samples).sort_values(by=["sample_time"])
Enter fullscreen mode Exit fullscreen mode

The sample is returned as a dict, while the past buffer is concatenated to a pandas data frame. A sample execution follows

({'events1Min': 2216,
  'users1Min': 362,
  'domains1Min': 80,
  'sample_time': Timestamp('2026-05-12 12:58:12.996165')},
    events1Min  users1Min  domains1Min                sample_time
 0        2216        362           80 2026-05-12 12:58:12.996165)
Enter fullscreen mode Exit fullscreen mode

The next cell sets up the reactivity of our data

# Necessary for reactive pandas
import panel as pn
import hvplot.pandas 

pn.extension()

sample_df, samples_df = get_changes()
table_changes = pn.rx(sample_df)
samples_df_rx = pn.rx(samples_df)

## Extract Data

def update_table_changes():
    sample_df, samples_df = get_changes()
    table_changes.rx.value = sample_df
    samples_df_rx.rx.value = samples_df

pn.state.add_periodic_callback(update_table_changes, period=60000)
Enter fullscreen mode Exit fullscreen mode

See documentation of Panel library here. The most important statement is the last one that sets up 1 minute periodicity of updates for our feeds to plots.

The next cells create a dashboard with the absolute defaults. No effort to tinker with CSS is taken. I will not spend time on the Panel components. The documentation is very thorough. What is remarkable though, is that you can directly serve the notebook with Panel. From you activated virtual environment run

panel serve .\dashboard.ipynb
Enter fullscreen mode Exit fullscreen mode

and you can navigate to the appropriate url http://localhost:5006/dashboard to visit your dashboard

Wikipedia changes dashboard

Epilogue

In the above article we gave an example of an end-to-end dashboard backed by Apache Pinot streaming table. The original stream comes from an Apache Kafka topic. The stream captures the Wikipedia page edits and is customarily used for streaming tutorials. We gave a quick description of Apache Kafka and Apache Pinot setup, how to ingest the page edits and how to visualize them. RedPanda can be used instead of Apache Kafka. See the related Readme.md for the necessary, but minimal, changes. As always the code is provided. If you find something is not clear, a bug, or have any suggestion, do not hesitate to post on the comments. I hope you enjoyed it.

Top comments (0)