<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Kepha Mwandiki</title>
    <description>The latest articles on DEV Community by Kepha Mwandiki (@kepha_mwandiki).</description>
    <link>https://dev.to/kepha_mwandiki</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3393960%2Ff545ee8f-4e74-40fd-bc22-053ad0ea8136.png</url>
      <title>DEV Community: Kepha Mwandiki</title>
      <link>https://dev.to/kepha_mwandiki</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/kepha_mwandiki"/>
    <language>en</language>
    <item>
      <title>From Kafka to Clean Tables: Building a Confluent Snowflake Pipeline with Streams &amp; Tasks</title>
      <dc:creator>Kepha Mwandiki</dc:creator>
      <pubDate>Wed, 01 Oct 2025 11:44:58 +0000</pubDate>
      <link>https://dev.to/kepha_mwandiki/from-kafka-to-clean-tables-building-a-confluent-snowflake-pipeline-with-streams-tasks-140d</link>
      <guid>https://dev.to/kepha_mwandiki/from-kafka-to-clean-tables-building-a-confluent-snowflake-pipeline-with-streams-tasks-140d</guid>
      <description>&lt;p&gt;Building reliable data pipelines often starts with messy JSON and ends with clean, analytics-ready tables. In this article, we will walk through the complete journey of streaming data from Kafka into Snowflake using the Confluent Snowflake Sink Connector. We will begin by generating private and public keys to authenticate Confluent with Snowflake, then set up the connector so raw json events land in a Snowflake table. From there, we will use Snowflake Streams and Tasks to continuously transform that nested json into a structured table. By the end, you’ll have a fully automated pipeline where Kafka pushes data through Confluent, Snowflake stores the raw records, and your clean tables stay updated in real-time, ready for dashboards, analytics, and reporting.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Setting up Confluent - Environment, cluster and Topic&lt;/strong&gt; &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Confluent Environment&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;An environment acts as a logical workspace in Confluent Cloud where you can group together clusters, topics and connectors for a specific project.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In the screenshot below, you can see the environment dashboard, which is the starting point before spinning up a Kafka cluster and creating topics to hold our streaming data.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fshjrn3c21xqt3ttcmyw3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fshjrn3c21xqt3ttcmyw3.png" alt=" " width="681" height="575"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Confluent Cluster&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The cluster is where your topics live and where all the data streaming happens.&lt;/li&gt;
&lt;li&gt;&lt;p&gt;In Confluent Cloud, you can choose between Basic, Standard, or Dedicated clusters depending on workload.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;For this pipeline, a standard/basic cluster is sufficient to handle ingesting weather data into our topic before pushing it downstream to Snowflake.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcvhbym1txzkma8ydfmq1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcvhbym1txzkma8ydfmq1.png" alt=" " width="800" height="326"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cloud Provider&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;When creating the cluster, Confluent asks you to pick a cloud provider and region (AWS or Azure etc).&lt;/li&gt;
&lt;li&gt;This choice is more than just location, it affects latency, data transfer costs, and overall performance.
_ - Always select the same cloud provider and region as your Snowflake account_.&lt;/li&gt;
&lt;li&gt;In our case, Snowflake is hosted on AWS af-south-1 (Cape Town), so we placed our Confluent cluster in the same AWS region to ensure low latency and avoid unnecessary cross-region data transfer fees.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F614ufyi2yi0pl64awmw2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F614ufyi2yi0pl64awmw2.png" alt=" " width="800" height="618"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Confluent Client&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;After the cluster is provisioned, Confluent provides client connection configurations so that applications can produce or consume messages from the cluster.&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Since our producer script is written in Python, we download the Python client configuration.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This file contains important details like the bootstrap servers, API key, and API secret, which authenticate the Python client to the Confluent cluster securely.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;These credentials are later used in our weather data producer script to publish json events (such as city, temperature, humidity, etc.) into the Kafka topic.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvps0l9al456m2d318k0x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvps0l9al456m2d318k0x.png" alt=" " width="800" height="411"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F30bmosnpaymsv5auplp8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F30bmosnpaymsv5auplp8.png" alt=" " width="800" height="275"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Creating a Topic&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The final step in setting up Confluent is creating a Kafka Topic inside the cluster.&lt;/li&gt;
&lt;li&gt;A topic is a channel where streaming data is stored and organized before being delivered downstream.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flbe4rxe53jd2x2vy5um2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Flbe4rxe53jd2x2vy5um2.png" alt=" " width="800" height="394"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data streaming into our Topic&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Once the topic is created, we can begin streaming data into it.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In the screenshot above, you can see messages arriving in Confluent, which confirms that the pipeline is working.&lt;/li&gt;
&lt;li&gt;This data is being produced by a Python producer script that I wrote earlier, which fetches live weather information and publishes it into the Kafka topic.&lt;/li&gt;
&lt;li&gt;On the other side, a simple Python consumer can also be used to verify that messages are flowing correctly before we connect downstream systems like Snowflake.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhfy0ek0pzx2txddktgiw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fhfy0ek0pzx2txddktgiw.png" alt=" " width="800" height="291"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Confluent Connectors and sinks&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Confluent offers a wide range of connectors that make it easy to move data between Kafka and external systems without writing custom code.&lt;br&gt;
In the screenshot below, you can see several available connectors sources to bring data into Kafka and sinks to deliver data out of Kafka.&lt;/p&gt;

&lt;p&gt;For our use case, we are interested in the Snowflake Sink Connector, because our goal is to continuously push streaming weather data from Kafka into Snowflake.&lt;/p&gt;

&lt;p&gt;The sink connector ensures that every new event in our topic is automatically written into a Snowflake table, making it available for storage and downstream analytics.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjxc8sbuv1ywrj3bdrdmy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjxc8sbuv1ywrj3bdrdmy.png" alt=" " width="800" height="398"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Snowflake SinK Connector&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;When setting up the Snowflake Sink Connector, the first step is to tell it which Kafka topic to read from.&lt;/li&gt;
&lt;li&gt;In this case, we select our previously created weather topic as the source.&lt;/li&gt;
&lt;li&gt;This ensures that every json message produced into the topic containing fields like city, temperature, and humidity is automatically ingested by the connector and prepared for delivery into Snowflake.&lt;/li&gt;
&lt;li&gt;By binding the connector to the topic, we establish the direct pipeline from Confluent into our Snowflake account.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpnmthruf5sptmowvse7c.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpnmthruf5sptmowvse7c.png" alt=" " width="800" height="296"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sink connector API&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;To authenticate the connector, Confluent requires an API key and secret that grant access to the Kafka cluster. In our case we will use the API we generated when creating this cluster.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsfa3l96w6u8oa8e311si.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fsfa3l96w6u8oa8e311si.png" alt=" " width="800" height="424"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Configuring Snowflake Connection details&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The next step is to configure the Snowflake connection details.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Here we provide the account URL, which is derived from the Snowflake account locator and region eg &lt;a href="https://xc15924.af-south-1.aws.snowflakecomputing.com" rel="noopener noreferrer"&gt;https://xc15924.af-south-1.aws.snowflakecomputing.com&lt;/a&gt; .&lt;/li&gt;
&lt;li&gt;We also specify the database, schema, and warehouse that will receive the data. All these should already be created in Snowflake.&lt;/li&gt;
&lt;li&gt;Finally, we configure the Snowflake user and attach the public key to enable key-pair authentication.&lt;/li&gt;
&lt;li&gt;This ensures the connector can securely deliver data into the right Snowflake environment without needing to store passwords.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp5toc1iunderddmmmpd1.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp5toc1iunderddmmmpd1.png" alt=" " width="800" height="695"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Getting your Public and Private keys&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;u&gt;Public&lt;/u&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The public key is derived from the private key and is the one uploaded to Snowflake.&lt;/li&gt;
&lt;li&gt;This allows Snowflake to validate signatures created with the private key.
Here's the command to extract the public key:
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

cat rsa_key.pub

sed -e '1d' -e '$d' rsa_key.pub | tr -d '\n' &amp;gt; rsa_key.pub.single
at rsa_key.pub.single

cat rsa_key.pub.single
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;u&gt;Private&lt;/u&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The private key is generated locally and never shared with Snowflake.&lt;/li&gt;
&lt;li&gt;It is used by the Snowflake Sink Connector to sign authentication requests.&lt;/li&gt;
&lt;li&gt;Below is the command I used to create a private key in PKCS8 format required by Snowflake
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;openssl genrsa 2048 | openssl pkcs8 -topk8 -nocrypt -out rsa_key.p8

cat rsa_key.p8

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;After getting the public key, you should now save it in snowflake.&lt;/p&gt;

&lt;p&gt;Below is the code for that, &lt;u&gt;USERNAME = your snowflake username.&lt;/u&gt;&lt;/p&gt;

&lt;p&gt;&lt;code&gt;ALTER USER USERNAME SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';&lt;br&gt;
&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The following code will confirm whether the public key has been set:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;SHOW PARAMETERS LIKE 'RSA_PUBLIC_KEY' FOR USER USERNAME;&lt;br&gt;
&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data format and ingestion method&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In the next step, the connector asks you to choose the data format and the ingestion method.&lt;/li&gt;
&lt;li&gt;For this pipeline, we select json as the format since our weather producer sends data in json structure.&lt;/li&gt;
&lt;li&gt;Snowflake offers two ingestion options: Snowpipe - micro-batch loading and Snowpipe - Streaming low-latency streaming ingestion.&lt;/li&gt;
&lt;li&gt;While both work, Snowpipe Streaming provides near real-time delivery into Snowflake, making it better suited for continuously flowing data like weather updates.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fei1gh56ear3hrpjw3xyu.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fei1gh56ear3hrpjw3xyu.png" alt=" " width="800" height="466"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Sizing&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The final step in configuring the connector is sizing, where you choose the number of tasks.&lt;/li&gt;
&lt;li&gt;Each task is a worker instance that reads from Kafka and writes to Snowflake.&lt;/li&gt;
&lt;li&gt;For small workloads, 1 task is enough.&lt;/li&gt;
&lt;li&gt;If you expect higher throughput, you can increase the task count to scale out ingestion.&lt;/li&gt;
&lt;li&gt;In our weather data pipeline, a single task is sufficient to handle the incoming JSON events.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs9qahwvl9egnrujtuwv5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs9qahwvl9egnrujtuwv5.png" alt=" " width="800" height="547"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Launching The Snowflake Sink Connector&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The final step is just confirming everything, and launching the connector.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Folb9cs9xglwlzjtbd6ja.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Folb9cs9xglwlzjtbd6ja.png" alt=" " width="800" height="474"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Data arriving into snowflake warehouse&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;When the Sink Connector pushes data into Snowflake, it creates a staging table that only has two columns &lt;u&gt;RECORD_METADATA&lt;/u&gt; which has Kafka details like topic, partition, offset, and timestamp and &lt;u&gt;RECORD_CONTENT&lt;/u&gt; which stores the full message as json. &lt;/li&gt;
&lt;li&gt;This makes sure all raw events are captured safely, but it’s not easy to query since everything sits inside one big json field. That's why we later use Streams and Tasks to transform this raw data into a clean, structured table with proper columns for analytics.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxebls8xlzsaab3yu3604.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fxebls8xlzsaab3yu3604.png" alt=" " width="800" height="413"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Creating a new structured table&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Now that the raw json is safely landing in Snowflake, the next step is to create a clean target table where our structured weather data will live. We will call this table &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;, and it will have properly defined columns like city, temperature, humidity, wind_speed, and others. &lt;/p&gt;

&lt;p&gt;This table will act as the final destination for all transformed data, making it much easier to query and analyse compared to the raw &lt;u&gt;RECORD_CONTENT&lt;/u&gt; json.&lt;/p&gt;

&lt;p&gt;Here is the SQL code for that:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- creating a new table WEATHER_DATA_TABLE
CREATE OR REPLACE TABLE WEATHER_DATA_TABLE (
 CITY          STRING,
LATITUDE      FLOAT,
LONGITUDE     FLOAT,
TEMPERATURE   FLOAT,
TEMP_MIN      FLOAT,
TEMP_MAX      FLOAT,
PRESSURE      INT,
HUMIDITY      INT,
WIND_SPEED    FLOAT,
CLOUDS        INT,
W_CONDITION   STRING,
DESCRIPTION   STRING,
TIME_STAMP    TIMESTAMP
);

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Inserting raw data from RECORD_CONTENT column into our new Structured table&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;After creating &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;, the next step was to insert data from the raw table's &lt;u&gt;RECORD_CONTENT&lt;/u&gt; column. All the weather details like city, temperature, humidity, wind_speed, and others were stored as json inside&lt;u&gt; RECORD_CONTENT&lt;/u&gt;. Using Snowflake's json functions, we pulled out each field from this json and placed it into the right column of WEATHER_DATA_TABLE. This turned the raw json in RECORD_CONTENT into a clean and structured table that is much easier to query and analyse.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- Inserting data from RECORD_CONTENT into the new table
INSERT INTO WEATHER_DATA_TABLE
SELECT
RECORD_CONTENT:city::string,
RECORD_CONTENT:latitude::float,
RECORD_CONTENT:longitude::float,
RECORD_CONTENT:temperature::float,
RECORD_CONTENT:temp_min::float,
RECORD_CONTENT:temp_max::float,
RECORD_CONTENT:pressure::int,
RECORD_CONTENT:humidity::int,
RECORD_CONTENT:wind_speed::float,
RECORD_CONTENT:clouds::int,
RECORD_CONTENT:w_condition::string,
RECORD_CONTENT:description::string,
TO_TIMESTAMP(RECORD_CONTENT:time_stamp::int)
FROM TOPIC_WEATHER_STREAM;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Creating a stream that tracks new changes in the TOPIC_WEATHER_STREAM table&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;To keep the clean table updated as new data arrives, we created a Snowflake Stream on the raw topic table. A stream works like a tracker, it records all the new rows or changes that come into the raw table. Instead of repeatedly scanning the whole table, we can just look at the stream to know what fresh data has arrived. This makes it easy to continuously insert only the latest weather records from &lt;u&gt;RECORD_CONTENT&lt;/u&gt; into &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- Creating A Snowflake stream tracks changes, new rows in a table.
CREATE OR REPLACE STREAM WEATHER_STREAM_CHANGES 
ON TABLE TOPIC_WEATHER_STREAM
APPEND_ONLY = TRUE;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Creating a Snowflake Task that automatically takes data from the stream inserting into our new table&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;After setting up the stream, the next step was to create a Snowflake Task. &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;A task is like a scheduler inside Snowflake, it can automatically run SQL statements at regular intervals. In our case, we used the task to read new data from the stream and insert it into &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;. This way, we don’t have to manually run the INSERT query every time fresh weather data arrives.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;The task checks the stream, finds any new rows in&lt;u&gt; RECORD_CONTENT&lt;/u&gt;, and then copies the values into the correct columns of &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;. By combining the stream which tracks changes, with the task which automates inserts, our pipeline becomes fully automated. Every time Kafka pushes new weather data through Confluent into Snowflake, the clean table updates on its own.
&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- creating a task that will run automatically  insert new rows into WEATHER_DATA_TABLE.

CREATE OR REPLACE TASK WEATHER_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO WEATHER_DATA_TABLE
SELECT DISTINCT
RECORD_CONTENT:city::string,
RECORD_CONTENT:latitude::float,
RECORD_CONTENT:longitude::float,
RECORD_CONTENT:temperature::float,
RECORD_CONTENT:temp_min::float,
RECORD_CONTENT:temp_max::float,
RECORD_CONTENT:pressure::int,
RECORD_CONTENT:humidity::int,
RECORD_CONTENT:wind_speed::float,
RECORD_CONTENT:clouds::int,
RECORD_CONTENT:w_condition::string,
RECORD_CONTENT:description::string,
TO_TIMESTAMP(RECORD_CONTENT:time_stamp::int)
FROM WEATHER_STREAM_CHANGES;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Once the task was created, the final step was to enable it so it could start running on its own. By default, a new task in Snowflake is created in a suspended state, meaning it won’t execute until you turn it on. We enabled the task with a single SQL command, and from that moment, Snowflake automatically began checking the stream and inserting new weather data into &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt; on schedule. This completed the automation, ensuring the clean table always stays up to date without any manual work.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;code&gt;-- Enabling the task created above&lt;br&gt;
ALTER TASK WEATHER_TASK RESUME;&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Snowflake full setup showing our tables, stream and task&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In this screenshot, you can see the full setup in Snowflake: the raw topic table holding json in RECORD_CONTENT, the clean WEATHER_DATA_TABLE where structured data lives, the stream that tracks new records, and the task that automates the inserts. Together, these pieces form the end-to-end pipeline - from raw Kafka events landing in Snowflake, all the way to a clean, continuously updated table that’s ready for analysis.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft0d7huciin748ne7g4ho.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft0d7huciin748ne7g4ho.png" alt=" " width="310" height="382"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Querying directly from our WEATHER_DATA_TABLE&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;With the pipeline complete, we can now query &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt; just like any normal Snowflake table. Since the data is already structured into clean columns, queries are straightforward. For example, to see the latest weather updates, we can run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT *
FROM WEATHER_DATA_TABLE
ORDER BY TIME_STAMP DESC
LIMIT 110;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Result showing our clean data in rows &amp;amp; columns&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In this screenshot, we can see the results of querying &lt;u&gt;WEATHER_DATA_TABLE&lt;/u&gt;. Unlike the raw json format, the data is now well organised into proper columns; city, temperature, humidity, windspeed, timestamp, and more. Everything is clean and easy to read, which makes analysis simple and efficient. &lt;/p&gt;

&lt;p&gt;This confirms that our pipeline successfully transformed the raw Kafka events into a structured Snowflake table ready for use.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgzbmacgq84gev2dmwe1x.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgzbmacgq84gev2dmwe1x.png" alt=" " width="800" height="405"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Snowflake Sink connector running&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The last screenshot shows the Snowflake Sink Connector actively running in Confluent. You can see that messages are being processed and delivered to Snowflake in real time. This confirms that the pipeline is live, as soon as new weather data is produced in Kafka, it flows through Confluent and lands in Snowflake, where our stream and task keep the clean table continuously updated.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8u7zedzonoufnhx3ys7t.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8u7zedzonoufnhx3ys7t.png" alt=" " width="800" height="429"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Conclusion&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;By combining Confluent Kafka, the Snowflake Sink Connector, and Snowflake's native features like Streams and Tasks, we built a fully automated data pipeline that transforms raw JSON events into clean, query-ready tables. This approach follows modern data engineering best practices separating raw and curated layers, automating ingestion and transformation, and ensuring data freshness with minimal manual effort.&lt;/p&gt;

&lt;p&gt;With this pipeline in place, every new weather event flows seamlessly from Kafka → Confluent → Snowflake, and ends up in a structured table optimised for analytics. It is scalable, reliable, and production ready, the kind of design expected in a real-world data platform architecture.&lt;/p&gt;

&lt;p&gt;In my next article, I'll share the full Python scripts for producing, consuming, and automating this pipeline end-to-end, so you can replicate and extend it in your own projects.&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>automation</category>
      <category>tutorial</category>
      <category>sql</category>
    </item>
    <item>
      <title>Apache Kafka Deep Dive: Core Concepts, Data Engineering Applications, and Real-World Production Practices</title>
      <dc:creator>Kepha Mwandiki</dc:creator>
      <pubDate>Tue, 23 Sep 2025 14:48:53 +0000</pubDate>
      <link>https://dev.to/kepha_mwandiki/apache-kafka-deep-dive-core-concepts-data-engineering-applications-and-real-world-production-1c99</link>
      <guid>https://dev.to/kepha_mwandiki/apache-kafka-deep-dive-core-concepts-data-engineering-applications-and-real-world-production-1c99</guid>
      <description>&lt;p&gt;Apache Kafka is an open-source distributed event streaming platform.&lt;/p&gt;

&lt;p&gt;&lt;u&gt;What does this mean?&lt;/u&gt; - Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single solution:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;To store streams of events durably and reliably for as long as you want.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;To process streams of events as they occur.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;How Does Kafka Work?&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol.&lt;/p&gt;

&lt;p&gt;&lt;u&gt;Server/Broker &lt;/u&gt; - The Server run the Kafka software and is responsible for:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Receiving messages from producers&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Storing them in topics &amp;amp; partitions&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Serving them to consumers when requested&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Each broker can handle thousands of partitions and millions of messages per second.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;i. Kafka clients&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;They allow you to write distributed applications and services that read, write, and process streams of events in parallel, at scale, and in a fault-tolerant manner even in the case of network problems or machine failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;ii. Producer clients&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Apps that send data into Kafka topics.&lt;/p&gt;

&lt;p&gt;Example: My weather script → sends weather JSON to Kafka.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;iii. Consumer clients&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Apps that read data from Kafka topics.&lt;/p&gt;

&lt;p&gt;Example: Snowflake loader → consumes weather data and inserts into a table.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;iv. Admin clients&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Used to manage Kafka; create topics, configure partitions, check clusters etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;2. &lt;u&gt;Apache Kafka Core Concepts.&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fno2kz3hbc2oxmjmm37z3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fno2kz3hbc2oxmjmm37z3.png" alt=" " width="800" height="533"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.1 Producer&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;An application that sends messages (records/events) into Kafka.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from kafka import KafkaProducer
import json
import time

# Create a KafkaProducer instance
# Using a JSON serializer for demonstration purposes
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Define the topic to send messages to
topic_name = 'My_Kafka_Article'

# Send a few messages
for i in range(3):
    message_data = {"id": i, "message": f"This is message number {i}"}
    print(f"Sending message: {message_data}")
    producer.send(topic_name, value=message_data)
    time.sleep(1) # Simulate some delay

# Ensure all messages are sent
producer.flush()

# Close the producer
producer.close()

print("Kafka Producer finished sending messages.")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.2 Consumer&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;An application that reads messages from Kafka.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from kafka import KafkaConsumer
import json

# 1. Create a Kafka Consumer Instance
consumer = KafkaConsumer(
   'My_Kafka_Article',  
    bootstrap_servers='localhost:9092', 
    # Deserialize JSON data
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) 
)

# 2. Poll for Messages and Process Them
for message in consumer:
    print(f"Received message: {message.value}")

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.3 Topic&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;A category or channel in Kafka where data is stored.&lt;br&gt;
Below is my example of some topics.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5fstrrg15ldkknmdzune.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5fstrrg15ldkknmdzune.png" alt=" " width="800" height="381"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.4 Partition&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Topics are partitioned, meaning a topic is spread over a number of "buckets" located on different Kafka brokers. This distributed placement of your data is very important for scalability because it allows client applications to both read and write the data from/to many brokers at the same time.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.5 Broker&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;A Kafka server that stores data and serves producers/consumers.&lt;/p&gt;

&lt;p&gt;Many brokers form a Kafka cluster.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;2.6 Cluster&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;A cluster is a group of brokers working together, where topics/partitions are distributed by kafka.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;3. Data Engineering Applications for Kafka&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;3.1 Real-Time Data Ingestion&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;This is the process of bringing real-time data from various data sources and streaming it into a warehouse, lake or a streaming platform.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;3.2 Log Aggregation&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Log aggregation collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3.2.1 Log Aggregation Pipeline with Kafka&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3.2.1.1 Log Sources - Producers:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Applications &lt;/li&gt;
&lt;li&gt;Web servers (Apache)&lt;/li&gt;
&lt;li&gt;System logs (via Fluentd, Filebeat, syslog → Kafka)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;3.2.1.2 Kafka Topics:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Logs are written into topics like app_logs, error_logs, access_logs.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;3.2.1.3 Consumers:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;HDFS/S3 → long-term storage.&lt;/li&gt;
&lt;li&gt;Monitoring tools → Grafana.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;3.3 Website Activity Tracking&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;3.4 Stream processing&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Stream processing means working with continuous flows of data, streams, in real time.&lt;/p&gt;

&lt;p&gt;Eg, Instead of analyzing yesterday’s sales, you process every transaction as it happens.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;3.4.1 Typical Flow of stream processing&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;u&gt;&lt;br&gt;
Producers&lt;/u&gt; → send data into Kafka (clicks, IoT, transactions).&lt;br&gt;
&lt;u&gt;Kafka Topics&lt;/u&gt; → receive and store the message.&lt;br&gt;
&lt;u&gt;Stream Processor&lt;/u&gt; (Kafka Streams ) → processes data in real time.&lt;/p&gt;

&lt;p&gt;Below shows a picture of real-time data querying, using confluent, a kafka streaming platform&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F13s1u7samuejjl4zc567.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F13s1u7samuejjl4zc567.png" alt=" " width="800" height="260"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;4 Real-World Production Practices Using Kafka&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;u&gt;&lt;strong&gt;4.1 Sports&lt;/strong&gt;&lt;/u&gt;&lt;/p&gt;

&lt;p&gt;Millions of fans world-wide need real-time updates on scores, player stats, and events.&lt;/p&gt;

&lt;p&gt;&lt;u&gt;How Kafka helps:&lt;/u&gt;&lt;/p&gt;

&lt;p&gt;Producers → stadium sensors, referee systems (VAR), commentary feeds.&lt;/p&gt;

&lt;p&gt;Kafka Topics → scores, player_stats, tracking_data.&lt;/p&gt;

&lt;p&gt;Consumers → Mobile apps eg LiveScore, FotMob get instant updates.&lt;/p&gt;

&lt;p&gt;Stream processors - aggregate, filter, and push alerts eg notifying the fans "Goal Scored"&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;&lt;u&gt;4.2 Banking &amp;amp; Finance&lt;/u&gt;&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Fraud needs to be caught in seconds. Payments must be processed swiftly and fast, with no duplicates.&lt;/p&gt;

&lt;p&gt;&lt;u&gt;How Kafka helps:&lt;/u&gt;&lt;/p&gt;

&lt;p&gt;Producers → ATM machines, mobile banking apps.&lt;/p&gt;

&lt;p&gt;Kafka Topics → transactions, fraud_alerts.&lt;/p&gt;

&lt;p&gt;Stream processors → Aggregate transactions per user in a 5-minute window.&lt;/p&gt;

&lt;p&gt;Flag anomalies eg notifying there have been too many withdrawals in 5 minutes.&lt;/p&gt;

&lt;p&gt;Consumers → fraud detection systems, real-time dashboards, data warehouse for historical data.&lt;/p&gt;

&lt;p&gt;&lt;u&gt;&lt;strong&gt;4.3 Healthcare&lt;/strong&gt;&lt;/u&gt;&lt;/p&gt;

&lt;p&gt;Patient vitals eg oxygen and heart rate must be monitored continuously to ensure good health and real-time follow up.&lt;/p&gt;

&lt;p&gt;How Kafka helps:&lt;/p&gt;

&lt;p&gt;Producers → IoT devices on patients eg wearables and hospital monitors.&lt;/p&gt;

&lt;p&gt;Kafka Topics → patient_vitals, alerts.&lt;/p&gt;

&lt;p&gt;Stream processors → Check thresholds eg heart rate greater than 180 bpm.&lt;br&gt;
Trigger emergency alerts instantly.&lt;/p&gt;

&lt;p&gt;Consumers → doctor dashboards, alert systems, patient history databases.&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>dataengineering</category>
      <category>kafka</category>
    </item>
    <item>
      <title>APACHE AIRFLOW, AND ITs IMPORTANCE IN DATA ENGINEERING</title>
      <dc:creator>Kepha Mwandiki</dc:creator>
      <pubDate>Sun, 07 Sep 2025 13:37:43 +0000</pubDate>
      <link>https://dev.to/kepha_mwandiki/apache-airflow-and-its-importance-in-data-engineering-jcl</link>
      <guid>https://dev.to/kepha_mwandiki/apache-airflow-and-its-importance-in-data-engineering-jcl</guid>
      <description>&lt;p&gt;&lt;strong&gt;Apache Airflow&lt;/strong&gt; - This is a tool used in workflow orchestration,  the automated coordination and management of data workflows.&lt;br&gt;
Airflow is important in data engineering since it provides a way to orchestrate, schedule and monitor workflows/pipelines.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Why use Airflow&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Scalability and Flexibility&lt;/strong&gt; - Airflow supports workflows ranging from small scripts to large scripts handling very large data.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Airflow works with many systems, databases, cloud storage, snowflake etc&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Scheduling&lt;/strong&gt; - Airflow has a built-in scheduler to run tasks at specific intervals, and also, it automates repetitive tasks, reducing manual intervention.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Monitoring&lt;/strong&gt; - Airflow provides an interface to track task execution, progress, successes and failures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Extensibility&lt;/strong&gt; - Airflow provides plugins and extensions to be able to connect with various systems eg APIs, AWS, AZURE etc&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Error handling&lt;/strong&gt; - Airflow makes error handling automated, flexible, and visible. Instead of always monitoring, you can set retries, alerts, and failure alerts so problems are handled.&lt;/p&gt;

&lt;h2&gt;
  
  
  Screenshot Documentation
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Airflow UI header with "Apache Airflow" logo&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcde8oe7b9xpfdf39g5i5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fcde8oe7b9xpfdf39g5i5.png" alt=" " width="800" height="149"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;DAGs list showing example DAGs&lt;/strong&gt;&lt;br&gt;
In airflow, a Directed Acyclic Graph is a defined set of instructions that tells airflow what tasks to run and in what order.&lt;br&gt;
A photo example of a DAG in airflow:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1nmp3ig2qnxmpqtclqc3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1nmp3ig2qnxmpqtclqc3.png" alt=" " width="800" height="364"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjk5p0y7hyhxwcy7uo2sh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fjk5p0y7hyhxwcy7uo2sh.png" alt=" " width="800" height="302"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Airflow Scheduler&lt;/strong&gt;&lt;br&gt;
It is a component of airflow responsible for deciding when and which tasks should run.&lt;br&gt;
Scheduler is rsponsible for triggering DAG runs and managing how many runs will run at specified times.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F05hy2vb4g4byld67d1v8.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F05hy2vb4g4byld67d1v8.png" alt=" " width="800" height="430"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Airflow Webserver&lt;/strong&gt;&lt;br&gt;
The Airflow Webserver is the component that provides the Graphical User Interface (GUI) for Airflow, it is the part you interact with in your browser to view, monitor, and manage your DAGs and tasks.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fh100rmk9di7hckv0nl15.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fh100rmk9di7hckv0nl15.png" alt=" " width="800" height="428"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Below is an image showing both the Webserver and the Scheduler running:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl9u1mpt59c4j5on5b3hj.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fl9u1mpt59c4j5on5b3hj.png" alt=" " width="800" height="423"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Below now is an example of the DAG running on my browser, clearly showing the tasks running, first run, most recent run, success/failure of some of the runs and how the tasks are scheduled.&lt;/p&gt;

&lt;p&gt;The DAG is running on my localhost:8080&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanvsmu8u5oxykjaz56sm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fanvsmu8u5oxykjaz56sm.png" alt=" " width="800" height="415"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Installing and setting up PostgreSQL on a Linux server</title>
      <dc:creator>Kepha Mwandiki</dc:creator>
      <pubDate>Sun, 03 Aug 2025 21:59:09 +0000</pubDate>
      <link>https://dev.to/kepha_mwandiki/installing-and-setting-up-postgresql-on-a-linux-server-2k1e</link>
      <guid>https://dev.to/kepha_mwandiki/installing-and-setting-up-postgresql-on-a-linux-server-2k1e</guid>
      <description>&lt;p&gt;&lt;strong&gt;Step 1: Updating the system&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You should update your server packages to ensure all existing packages are up to date. This can be done by running the commands:&lt;br&gt;
Sudo apt update&lt;br&gt;
Sudo apt upgrade -y&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 2: Installing PostgreSQL&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The following commands are run when installing PostgreSQL and its necessary packages:&lt;br&gt;
Sudo apt install PostgreSQL PostgreSQL-contrib&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 3: Verifying the installation.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;The PostgreSQL service should start automatically after complete installation. The following commands are used to verify the installation.&lt;/p&gt;

&lt;p&gt;Sudo systemctl status PostgreSQL&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 4: Accessing the PostgreSQL shell&lt;/strong&gt;&lt;br&gt;
PostgreSQL creates a Linux user ‘postgres’ by default. To interact with this:&lt;/p&gt;

&lt;p&gt;Sudo -I -u postgres&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 5: Creating a new user and Database&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;To create a new user, the following command is used:&lt;br&gt;
CREATE USER ‘user1’ WITH PASSWORD ‘password’;&lt;br&gt;
Then create a database:&lt;br&gt;
CREATE DATABASE ‘mydatabase’ WITH OWNER ‘user1’;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Step 6: Secure your installation with a firewall&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Enable a firewall like ufw on ubuntu to restrict external access. You must first open the default PostgreSQL port, 5432 to allow remote connections.&lt;/p&gt;

&lt;p&gt;Sudo ufw allow 5432/tcp&lt;br&gt;
Sudo ufw reload&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;End.&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;You have now successfully installed and setup PostgreSQL on your Linux server. You have created a user and a database, and secured the connection.&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
