Introduction
In this post, we present an introductory example using Apache Pinot to ingest an Apache Kafka stream. This is an introductory post that builds upon existing Apache Pinot material from the official trainings and documentation. The purpose here is not just to rehash what is in the official docs, but a preparation for a second part. The idea, is to adapt the official examples to this end. Moreover, when I tried to run these examples, I had some extra ideas in how to better present the material. Part of the presented setup is also based on yet another Apache Pinot example in a complementary series of lectures that is written for Javascript. Our focus here is Python. Here are the two references I used
- Lecture 4 (https://github.com/startreedata/learn/tree/main/pinot-advanced/04-stream-ingestion). It is a series of advanced Pinot usage from Startree. I Ported the JS example to Python.
- Updated continuously Streamlit example
Another purpose of this introduction is to document my learning process so as to use it later as a reference or personal notes. Consequently the coherence of the material presented is of paramount importance.
For a formal introduction to Apache Pinot, the excellent playlists below are highly recommended.
Apache Pinot 101
Apache Pinot 201
Let's start our journey.
Booting up setup and running our first streaming session
Our setup is completely local. We will use exclusively Podman. All the executions are done on Windows 11 using Command Prompt terminals under VScodium. You might need to apply some minor changes for your environment (if any).
The docker compose file is mostly covered here . We just added a .env file for convenience.
podman compose up -d
This starts an Apache Kafka single-node cluster and an Apache Pinot cluster with one Controller, one Broker and one Server nodes. More on this later. You can visit the Apache Pinot Controller UI here.
Having started Apache Kafka and Apache Pinot we need to push some data to Apache Kafka and link Apache Pinot to Apache Kafka through a streaming table. As in both references, we will use Wikipedia page edits event stream as a data source. Every page edit on Wikipedia is recorded as a event. There are many page edits throughout the world in an ever increasing body of knowledge on Wikipedia. This happens, literally continuously and such activity can be modeled as an event source. This event is made public in the following url https://stream.wikimedia.org/v2/stream/recentchange and people can visit it with their browser and see these events. Obviously, the typical web surfer is not interested in this overwhelming, ever growing list of repetitive JSON context. It is so large that one has to resort to Data Analytics methods, so as to make sense. Moreover, this event stream is not structured in a way to convey meaning as a typical web page. On the contrary, methods of Data Engineering are necessary to capture it in a streaming table (Apache Spark terminology is used here), do whatever data transformations are necessary and then make it available to a Data Analytics system for visualizing the different aspects.
First, we need to understand the data source. The data source is delivered in what is commonly referred to as SSE format. Wikipedia, unsurprisingly has a very detailed page with documentation on this. It also lists various code snippets on how to consume it. In terms of Data Engineering,
is a web service that exposes continuous streams of structured event data. It does so over HTTP.
For a Data Engineer, a source transport format is half the story. The rest is the schema. It is available here.
In terms of software development, this means, that we need a client library. There are many, but SSE client stands out. It is also used in the Streamlit tutorial of Apache Pinot. For simplicity, we will use the Wikipedia approach.
Here is the adapted code from Wikipedia.
url = 'https://stream.wikimedia.org/v2/stream/recentchange'
headers = {"User-Agent": "advanced_pinot_tutorial"}
with EventSource(url, headers=headers) as stream:
for event in stream:
if event.type == 'message':
try:
change = json.loads(event.data)
change['ts'] = change['timestamp'] * 1000
del change['timestamp']
# Kafka Place holder Code is here
except ValueError:
pass
From the schema what stands out for a streaming source is the timestamp
timestamp:
description: Unix timestamp (derived from rc_timestamp).
type: integer
maximum: 9007199254740991
minimum: -9007199254740991
The above conversion is to avoid a conflict with any internal timestamp function. Also we convert the Unix timestamp to milliseconds. Keep it in mind.
Now we need some code to push to an Apache Kafka topic. We use the confluent-kafka library.
First we setup our Apache Kafka connection (we implicitly assume the default 9092 port for the Apache Kafka), which is petty much self-explanatory
kafka_topic_name = "wikipedia-events"
# conf = {'bootstrap.servers': 'redpanda-0,redpanda-1,redpanda-2'}
conf = {'bootstrap.servers': 'kafka'}
kafka_admin = admin.AdminClient(conf)
kafka_admin.delete_topics([kafka_topic_name])
kafka_admin.create_topics([admin.NewTopic(kafka_topic_name, 1, 1)])
producer = Producer(conf)
and then in the Apache Kafka placeholder in the previous snippet we put the push logic
producer.poll(0)
producer.produce(kafka_topic_name, key=change["meta"]["id"], value=json.dumps(change), callback=acked)
events_processed += 1
if events_processed == 100:
print(f"{str(datetime.datetime.now())} Flushing after {events_processed} events")
producer.flush()
events_processed = 0
every 100 events, we log the push of the batch. Confluent has very good documentation on how this library is used.
We pack the application a Docker image
podman build -t pinot-advanced/python-streaming-ingest ./producer-app
and then, we run it
podman run -it --network=pinot-advanced pinot-advanced/python-streaming-ingest:latest
Now it is time to verify the Apache Kafka push is working appropriately. For convenience a consumer Python app is provided. You can start it with similar commands
podman build -t pinot-advanced/python-kafka-consumer ./consumer-app
podman run -it --network=pinot-advanced pinot-advanced/python-kafka-consumer:latest
Everything seems to work fine.
Setting up Apache Pinot and running our first query
In order to create the streaming table, we need to tell Apache Pinot both the transport format and the schema. The schema need not be exhaustive, but include a subset of what we need. For this reason we need two files.
The schema file.
Each column in Apache Pinot has one of the following types.
- Dimension
- Metric
- Date/Time
It is pretty obvious what the last one is used for. The first one is for filtering (used for drilling down). The second one is for aggregations. This distinction does not exist in relational databases or other Big Data solutions, and is what makes Apache Pinot a true Big Data streaming solution.
We will not need any metric fields, since we get a stream of data edits. We will do what people call distinctCounts which in reality is an aggregation, but the fields we will use are not numeric and so, they cannot go to the metric fields section. Here you are
{
"schemaName": "wikievents",
"dimensionFieldSpecs": [
{
"name": "metaJson",
"dataType": "STRING"
},
{
"name": "user",
"dataType": "STRING"
},
{
"name": "domain",
"dataType": "STRING"
},
{
"name": "topic",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
The config file.
Next one is the table configuration and transport format. See https://github.com/fithisux/visualize-streamlit-pinot-example/blob/main/scripts/wikipedia_events_realtime_table_config.json for the details.
I will just focus on this snippet
{
"transformConfigs": [
{
"columnName": "domain",
"transformFunction": "JSONPATH(metaJson, '$.domain')"
},
{
"columnName": "topic",
"transformFunction": "JSONPATH(metaJson, '$.topic')"
}
]
},
It is necessary, so as to grab the fields from the JSON payload of the Apache Kafka message. So, fields topic and domain are computed fields, and for this reason we need explicitly expose the metaJson column.
Our first query
With the compose file and streamer app up and running we will construct our table in Apache Pinot.
podman run -it --network=pinot-advanced -v ./scripts/wikipedia_events_schema.json:/scripts/wikipedia_events_schema.json -v ./scripts/wikipedia_events_realtime_table_config.json:/scripts/wikipedia_events_realtime_table_config.json apachepinot/pinot:latest-25-ms-openjdk AddTable -schemaFile /scripts/wikipedia_events_schema.json -tableConfigFile /scripts/wikipedia_events_realtime_table_config.json -controllerHost pinot-controller -exec
We mount ./scripts on a purpose built container that will use schema and table config in order to create the table.
You can view the table by navigating to Pinot Controller locally here and run your first query
select domain, topic, user, ts from wikievents limit 10;
Here is a sample of what you should expect
Running the dashboard
Deviating from the sample Streamlit app provided by Startree, but similar in spirit we provide a Dashboard. Before delving into the code base let's clarify the business logic of the dashboard. We run a sampling query that works on a window from the sampling time, 1 minute back into the past. In this window we sample three important quantities:
- The number of changes that happened
- The different users that committed these changes
- The different domains where this change took place.
Our dashboard will carry the current sample, and a window back in time of the 30 latest samples. For visualization we will will record the sample, and we will plot the 30 samples buffer as a visual summary. Our dashboard will be implemented with the Panel python package in a notebook. Is used VScodium for convenience. It is advised to create a virtual environment, install the dependencies there and then use it as a kernel for executing the notebook.
How is the sample obtained is just an Apache Pinot query away:
select
count(*) AS events1Min,
distinctcount(user) AS users1Min,
distinctcount(domain) AS domains1Min
from wikievents_REALTIME
where ts > ago('PT1M')
limit 1;
ago function uses ISO 8601 duration format to construct a bound for the window.
This is our main building block. To implement our sampling logic here is the relevant notebook cell
from pinotdb import connect
import pandas as pd
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
list_of_samples = []
def get_changes():
query = """
select
count(*) AS events1Min,
distinctcount(user) AS users1Min,
distinctcount(domain) AS domains1Min
from wikievents_REALTIME
where ts > ago('PT1M')
limit 1;
"""
curs = conn.cursor()
curs.execute(query)
temp_df = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
temp_df['sample_time'] = pd.Timestamp.now()
list_of_samples.append(temp_df)
if len(list_of_samples) > 30:
list_of_samples.pop(0)
return temp_df.to_dict('records')[0], pd.concat(list_of_samples).sort_values(by=["sample_time"])
The sample is returned as a dict, while the past buffer is concatenated to a pandas data frame. A sample execution follows
({'events1Min': 2216,
'users1Min': 362,
'domains1Min': 80,
'sample_time': Timestamp('2026-05-12 12:58:12.996165')},
events1Min users1Min domains1Min sample_time
0 2216 362 80 2026-05-12 12:58:12.996165)
The next cell sets up the reactivity of our data
# Necessary for reactive pandas
import panel as pn
import hvplot.pandas
pn.extension()
sample_df, samples_df = get_changes()
table_changes = pn.rx(sample_df)
samples_df_rx = pn.rx(samples_df)
## Extract Data
def update_table_changes():
sample_df, samples_df = get_changes()
table_changes.rx.value = sample_df
samples_df_rx.rx.value = samples_df
pn.state.add_periodic_callback(update_table_changes, period=60000)
See documentation of Panel library here. The most important statement is the last one that sets up 1 minute periodicity of updates for our feeds to plots.
The next cells create a dashboard with the absolute defaults. No effort to tinker with CSS is taken. I will not spend time on the Panel components. The documentation is very thorough. What is remarkable though, is that you can directly serve the notebook with Panel. From you activated virtual environment run
panel serve .\dashboard.ipynb
and you can navigate to the appropriate url http://localhost:5006/dashboard to visit your dashboard
Epilogue
In the above article we gave an example of an end-to-end dashboard backed by Apache Pinot streaming table. The original stream comes from an Apache Kafka topic. The stream captures the Wikipedia page edits and is customarily used for streaming tutorials. We gave a quick description of Apache Kafka and Apache Pinot setup, how to ingest the page edits and how to visualize them. RedPanda can be used instead of Apache Kafka. See the related Readme.md for the necessary, but minimal, changes. As always the code is provided. If you find something is not clear, a bug, or have any suggestion, do not hesitate to post on the comments. I hope you enjoyed it.





Top comments (0)