DEV Community

Cover image for From Kafka to Clean Tables: Building a Confluent Snowflake Pipeline with Streams & Tasks
Kepha Mwandiki
Kepha Mwandiki

Posted on

From Kafka to Clean Tables: Building a Confluent Snowflake Pipeline with Streams & Tasks

Building reliable data pipelines often starts with messy JSON and ends with clean, analytics-ready tables. In this article, we will walk through the complete journey of streaming data from Kafka into Snowflake using the Confluent Snowflake Sink Connector. We will begin by generating private and public keys to authenticate Confluent with Snowflake, then set up the connector so raw json events land in a Snowflake table. From there, we will use Snowflake Streams and Tasks to continuously transform that nested json into a structured table. By the end, you’ll have a fully automated pipeline where Kafka pushes data through Confluent, Snowflake stores the raw records, and your clean tables stay updated in real-time, ready for dashboards, analytics, and reporting.

Setting up Confluent - Environment, cluster and Topic

Confluent Environment

  • An environment acts as a logical workspace in Confluent Cloud where you can group together clusters, topics and connectors for a specific project.

  • In the screenshot below, you can see the environment dashboard, which is the starting point before spinning up a Kafka cluster and creating topics to hold our streaming data.

Confluent Cluster

  • The cluster is where your topics live and where all the data streaming happens.
  • In Confluent Cloud, you can choose between Basic, Standard, or Dedicated clusters depending on workload.

  • For this pipeline, a standard/basic cluster is sufficient to handle ingesting weather data into our topic before pushing it downstream to Snowflake.

Cloud Provider

  • When creating the cluster, Confluent asks you to pick a cloud provider and region (AWS or Azure etc).
  • This choice is more than just location, it affects latency, data transfer costs, and overall performance. _ - Always select the same cloud provider and region as your Snowflake account_.
  • In our case, Snowflake is hosted on AWS af-south-1 (Cape Town), so we placed our Confluent cluster in the same AWS region to ensure low latency and avoid unnecessary cross-region data transfer fees.

Confluent Client

  • After the cluster is provisioned, Confluent provides client connection configurations so that applications can produce or consume messages from the cluster.
  • Since our producer script is written in Python, we download the Python client configuration.

  • This file contains important details like the bootstrap servers, API key, and API secret, which authenticate the Python client to the Confluent cluster securely.

  • These credentials are later used in our weather data producer script to publish json events (such as city, temperature, humidity, etc.) into the Kafka topic.

Creating a Topic

  • The final step in setting up Confluent is creating a Kafka Topic inside the cluster.
  • A topic is a channel where streaming data is stored and organized before being delivered downstream.

Data streaming into our Topic

Once the topic is created, we can begin streaming data into it.

  • In the screenshot above, you can see messages arriving in Confluent, which confirms that the pipeline is working.
  • This data is being produced by a Python producer script that I wrote earlier, which fetches live weather information and publishes it into the Kafka topic.
  • On the other side, a simple Python consumer can also be used to verify that messages are flowing correctly before we connect downstream systems like Snowflake.

Confluent Connectors and sinks

Confluent offers a wide range of connectors that make it easy to move data between Kafka and external systems without writing custom code.
In the screenshot below, you can see several available connectors sources to bring data into Kafka and sinks to deliver data out of Kafka.

For our use case, we are interested in the Snowflake Sink Connector, because our goal is to continuously push streaming weather data from Kafka into Snowflake.

The sink connector ensures that every new event in our topic is automatically written into a Snowflake table, making it available for storage and downstream analytics.

Snowflake SinK Connector

  • When setting up the Snowflake Sink Connector, the first step is to tell it which Kafka topic to read from.
  • In this case, we select our previously created weather topic as the source.
  • This ensures that every json message produced into the topic containing fields like city, temperature, and humidity is automatically ingested by the connector and prepared for delivery into Snowflake.
  • By binding the connector to the topic, we establish the direct pipeline from Confluent into our Snowflake account.

Sink connector API

To authenticate the connector, Confluent requires an API key and secret that grant access to the Kafka cluster. In our case we will use the API we generated when creating this cluster.

Configuring Snowflake Connection details

The next step is to configure the Snowflake connection details.

  • Here we provide the account URL, which is derived from the Snowflake account locator and region eg https://xc15924.af-south-1.aws.snowflakecomputing.com .
  • We also specify the database, schema, and warehouse that will receive the data. All these should already be created in Snowflake.
  • Finally, we configure the Snowflake user and attach the public key to enable key-pair authentication.
  • This ensures the connector can securely deliver data into the right Snowflake environment without needing to store passwords.

Getting your Public and Private keys

Public

  • The public key is derived from the private key and is the one uploaded to Snowflake.
  • This allows Snowflake to validate signatures created with the private key. Here's the command to extract the public key:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

cat rsa_key.pub

sed -e '1d' -e '$d' rsa_key.pub | tr -d '\n' > rsa_key.pub.single
at rsa_key.pub.single

cat rsa_key.pub.single
Enter fullscreen mode Exit fullscreen mode

Private

  • The private key is generated locally and never shared with Snowflake.
  • It is used by the Snowflake Sink Connector to sign authentication requests.
  • Below is the command I used to create a private key in PKCS8 format required by Snowflake
openssl genrsa 2048 | openssl pkcs8 -topk8 -nocrypt -out rsa_key.p8

cat rsa_key.p8

Enter fullscreen mode Exit fullscreen mode

After getting the public key, you should now save it in snowflake.

Below is the code for that, USERNAME = your snowflake username.

ALTER USER USERNAME SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';

The following code will confirm whether the public key has been set:

SHOW PARAMETERS LIKE 'RSA_PUBLIC_KEY' FOR USER USERNAME;

Data format and ingestion method

  • In the next step, the connector asks you to choose the data format and the ingestion method.
  • For this pipeline, we select json as the format since our weather producer sends data in json structure.
  • Snowflake offers two ingestion options: Snowpipe - micro-batch loading and Snowpipe - Streaming low-latency streaming ingestion.
  • While both work, Snowpipe Streaming provides near real-time delivery into Snowflake, making it better suited for continuously flowing data like weather updates.

Sizing

  • The final step in configuring the connector is sizing, where you choose the number of tasks.
  • Each task is a worker instance that reads from Kafka and writes to Snowflake.
  • For small workloads, 1 task is enough.
  • If you expect higher throughput, you can increase the task count to scale out ingestion.
  • In our weather data pipeline, a single task is sufficient to handle the incoming JSON events.

Launching The Snowflake Sink Connector

  • The final step is just confirming everything, and launching the connector.

Data arriving into snowflake warehouse

  • When the Sink Connector pushes data into Snowflake, it creates a staging table that only has two columns RECORD_METADATA which has Kafka details like topic, partition, offset, and timestamp and RECORD_CONTENT which stores the full message as json.
  • This makes sure all raw events are captured safely, but it’s not easy to query since everything sits inside one big json field. That's why we later use Streams and Tasks to transform this raw data into a clean, structured table with proper columns for analytics.

Creating a new structured table

Now that the raw json is safely landing in Snowflake, the next step is to create a clean target table where our structured weather data will live. We will call this table WEATHER_DATA_TABLE, and it will have properly defined columns like city, temperature, humidity, wind_speed, and others.

This table will act as the final destination for all transformed data, making it much easier to query and analyse compared to the raw RECORD_CONTENT json.

Here is the SQL code for that:

-- creating a new table WEATHER_DATA_TABLE
CREATE OR REPLACE TABLE WEATHER_DATA_TABLE (
 CITY          STRING,
LATITUDE      FLOAT,
LONGITUDE     FLOAT,
TEMPERATURE   FLOAT,
TEMP_MIN      FLOAT,
TEMP_MAX      FLOAT,
PRESSURE      INT,
HUMIDITY      INT,
WIND_SPEED    FLOAT,
CLOUDS        INT,
W_CONDITION   STRING,
DESCRIPTION   STRING,
TIME_STAMP    TIMESTAMP
);

Enter fullscreen mode Exit fullscreen mode

Inserting raw data from RECORD_CONTENT column into our new Structured table

After creating WEATHER_DATA_TABLE, the next step was to insert data from the raw table's RECORD_CONTENT column. All the weather details like city, temperature, humidity, wind_speed, and others were stored as json inside RECORD_CONTENT. Using Snowflake's json functions, we pulled out each field from this json and placed it into the right column of WEATHER_DATA_TABLE. This turned the raw json in RECORD_CONTENT into a clean and structured table that is much easier to query and analyse.

-- Inserting data from RECORD_CONTENT into the new table
INSERT INTO WEATHER_DATA_TABLE
SELECT
RECORD_CONTENT:city::string,
RECORD_CONTENT:latitude::float,
RECORD_CONTENT:longitude::float,
RECORD_CONTENT:temperature::float,
RECORD_CONTENT:temp_min::float,
RECORD_CONTENT:temp_max::float,
RECORD_CONTENT:pressure::int,
RECORD_CONTENT:humidity::int,
RECORD_CONTENT:wind_speed::float,
RECORD_CONTENT:clouds::int,
RECORD_CONTENT:w_condition::string,
RECORD_CONTENT:description::string,
TO_TIMESTAMP(RECORD_CONTENT:time_stamp::int)
FROM TOPIC_WEATHER_STREAM;
Enter fullscreen mode Exit fullscreen mode

Creating a stream that tracks new changes in the TOPIC_WEATHER_STREAM table

To keep the clean table updated as new data arrives, we created a Snowflake Stream on the raw topic table. A stream works like a tracker, it records all the new rows or changes that come into the raw table. Instead of repeatedly scanning the whole table, we can just look at the stream to know what fresh data has arrived. This makes it easy to continuously insert only the latest weather records from RECORD_CONTENT into WEATHER_DATA_TABLE.

-- Creating A Snowflake stream tracks changes, new rows in a table.
CREATE OR REPLACE STREAM WEATHER_STREAM_CHANGES 
ON TABLE TOPIC_WEATHER_STREAM
APPEND_ONLY = TRUE;
Enter fullscreen mode Exit fullscreen mode

Creating a Snowflake Task that automatically takes data from the stream inserting into our new table

After setting up the stream, the next step was to create a Snowflake Task.

  • A task is like a scheduler inside Snowflake, it can automatically run SQL statements at regular intervals. In our case, we used the task to read new data from the stream and insert it into WEATHER_DATA_TABLE. This way, we don’t have to manually run the INSERT query every time fresh weather data arrives.

    • The task checks the stream, finds any new rows in RECORD_CONTENT, and then copies the values into the correct columns of WEATHER_DATA_TABLE. By combining the stream which tracks changes, with the task which automates inserts, our pipeline becomes fully automated. Every time Kafka pushes new weather data through Confluent into Snowflake, the clean table updates on its own.
-- creating a task that will run automatically  insert new rows into WEATHER_DATA_TABLE.

CREATE OR REPLACE TASK WEATHER_TASK
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
INSERT INTO WEATHER_DATA_TABLE
SELECT DISTINCT
RECORD_CONTENT:city::string,
RECORD_CONTENT:latitude::float,
RECORD_CONTENT:longitude::float,
RECORD_CONTENT:temperature::float,
RECORD_CONTENT:temp_min::float,
RECORD_CONTENT:temp_max::float,
RECORD_CONTENT:pressure::int,
RECORD_CONTENT:humidity::int,
RECORD_CONTENT:wind_speed::float,
RECORD_CONTENT:clouds::int,
RECORD_CONTENT:w_condition::string,
RECORD_CONTENT:description::string,
TO_TIMESTAMP(RECORD_CONTENT:time_stamp::int)
FROM WEATHER_STREAM_CHANGES;
Enter fullscreen mode Exit fullscreen mode
  • Once the task was created, the final step was to enable it so it could start running on its own. By default, a new task in Snowflake is created in a suspended state, meaning it won’t execute until you turn it on. We enabled the task with a single SQL command, and from that moment, Snowflake automatically began checking the stream and inserting new weather data into WEATHER_DATA_TABLE on schedule. This completed the automation, ensuring the clean table always stays up to date without any manual work.

-- Enabling the task created above
ALTER TASK WEATHER_TASK RESUME;

Snowflake full setup showing our tables, stream and task

In this screenshot, you can see the full setup in Snowflake: the raw topic table holding json in RECORD_CONTENT, the clean WEATHER_DATA_TABLE where structured data lives, the stream that tracks new records, and the task that automates the inserts. Together, these pieces form the end-to-end pipeline - from raw Kafka events landing in Snowflake, all the way to a clean, continuously updated table that’s ready for analysis.

Querying directly from our WEATHER_DATA_TABLE

With the pipeline complete, we can now query WEATHER_DATA_TABLE just like any normal Snowflake table. Since the data is already structured into clean columns, queries are straightforward. For example, to see the latest weather updates, we can run:

SELECT *
FROM WEATHER_DATA_TABLE
ORDER BY TIME_STAMP DESC
LIMIT 110;
Enter fullscreen mode Exit fullscreen mode

Result showing our clean data in rows & columns

In this screenshot, we can see the results of querying WEATHER_DATA_TABLE. Unlike the raw json format, the data is now well organised into proper columns; city, temperature, humidity, windspeed, timestamp, and more. Everything is clean and easy to read, which makes analysis simple and efficient.

This confirms that our pipeline successfully transformed the raw Kafka events into a structured Snowflake table ready for use.

Snowflake Sink connector running

The last screenshot shows the Snowflake Sink Connector actively running in Confluent. You can see that messages are being processed and delivered to Snowflake in real time. This confirms that the pipeline is live, as soon as new weather data is produced in Kafka, it flows through Confluent and lands in Snowflake, where our stream and task keep the clean table continuously updated.

Conclusion

By combining Confluent Kafka, the Snowflake Sink Connector, and Snowflake's native features like Streams and Tasks, we built a fully automated data pipeline that transforms raw JSON events into clean, query-ready tables. This approach follows modern data engineering best practices separating raw and curated layers, automating ingestion and transformation, and ensuring data freshness with minimal manual effort.

With this pipeline in place, every new weather event flows seamlessly from Kafka → Confluent → Snowflake, and ends up in a structured table optimised for analytics. It is scalable, reliable, and production ready, the kind of design expected in a real-world data platform architecture.

In my next article, I'll share the full Python scripts for producing, consuming, and automating this pipeline end-to-end, so you can replicate and extend it in your own projects.

Top comments (1)

Collapse
 
elisha_lukalia_f382f081a0 profile image
elisha lukalia

Good stuff.