In this article we will be turning a raw dataset containing Kenya food prices in different regions across the country over the years.We'll achieve this via an Extraction, Transformation and Loading pipeline building a complete data pipeline for analysis.
We'll go from a CSV file to a clean, query-optimized database and a beautiful, interactive Grafana dashboard.
PROJECT ARCHITECTURE
Our data pipeline for analysis will look like this:
- Source: A World Food Programme CSV file containing Kenya Food Prices in different Kenyan provinces and regions since 2006.
- ETL Engine: A Python script utilizing Pandas for the Extraction, Transformation and Loading of the extracted data into designated database for analysis.
- Data Warehouse: A PostgreSQL database modeled with a Star Schema
- Visualization Layer: A Grafana dashboard that queries the database to get insights.
We use a Star Schema which contains a central fact table that has quantitative elements like price surrounded by dimensional tables that have descriptive attributes like location and commodity.
Step 1: Python Environment and Database Configuration
1.Setup Python Virtual Environment
Create a virtual environment and install necessary libraries.
python -m venv venv
source venv/bin/activate # For macOS/Linux
# .\venv\Scripts\activate # For Windows
# Create a requirements.txt file
touch requirements.txt
Add the following to requirements.txt:
pandas
SQLAlchemy
psycopg2-binary
python-dotenv
Install them:
pip install -r requirements.txt
2.Create the Database and Tables
In your PostgreSQL instance, create a database.
CREATE DATABASE kenya_food_prices;
Connect to the created database and use this SQL script to set up a star schema.
UNIQUE constraints in the SQL scripts are key to prevent duplicating dimension data.
CREATE TABLE dim_date (
date_id SERIAL PRIMARY KEY,
date_value DATE NOT NULL UNIQUE,
year INTEGER NOT NULL,
month INTEGER NOT NULL,
day INTEGER NOT NULL
);
CREATE TABLE dim_location (
location_id SERIAL PRIMARY KEY,
admin1 VARCHAR(255),
admin2 VARCHAR(255),
market VARCHAR(255),
UNIQUE (admin1, admin2, market)
);
CREATE TABLE dim_commodity (
commodity_id SERIAL PRIMARY KEY,
category VARCHAR(255),
commodity_name VARCHAR(255),
unit VARCHAR(255),
UNIQUE (category, commodity_name, unit)
);
CREATE TABLE dim_market_type (
market_type_id SERIAL PRIMARY KEY,
market_type VARCHAR(255) UNIQUE
);
CREATE TABLE fact_food_prices (
price_id SERIAL PRIMARY KEY,
date_id INTEGER REFERENCES dim_date(date_id),
location_id INTEGER REFERENCES dim_location(location_id),
commodity_id INTEGER REFERENCES dim_commodity(commodity_id),
market_type_id INTEGER REFERENCES dim_market_type(market_type_id),
price_kes NUMERIC(10, 2),
price_usd NUMERIC(10, 2),
UNIQUE (date_id, location_id, commodity_id, market_type_id)
);
3.Storing Credentials Securely
Create a .env file in project root to safely store credentials.
DB_HOST=localhost
DB_NAME=kenya_food_prices
DB_USER=your_postgres_user
DB_PASSWORD=your_postgres_password
DB_PORT=5432
We will use python-dotenv to load the credentials securely in our python script.
Step 2: Python ETL Script
Part 1: Extraction and Transformation
First we use Pandas to read and extract the csv.
The CSV file has an extra header row we need to skip and messy column names.
import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine
# Load environment variables
load_dotenv()
db_host = os.getenv("DB_HOST")
# ... (load other db variables)
conn_string = f'postgresql://{db_user}:{db_password}@{db_host}/{db_name}'
engine = create_engine(conn_string)
def extraction_and_transformation():
# Extraction: Read CSV, skipping the second row which is a comment
df = pd.read_csv('wfp_food_prices_ken.csv', skiprows=[1])
# Transformation
df = df.dropna()
df.columns = df.columns.str.strip() # Clean column names
df.drop_duplicates(inplace=True)
df.reset_index(drop=True, inplace=True)
print("Data extracted and transformed successfully!")
return df
Part 2: Populating the Database
We make our script idempotent- this means it can run multiple times without creating duplicate data.
We'll start with the location dimension:
def load_location_dimension(df):
print("Loading location dimension...")
# 1. Select unique locations from the source data
location_cols = ['admin1', 'admin2', 'market']
locations = df[location_cols].drop_duplicates()
# 2. Get existing locations from the database
existing = pd.read_sql("SELECT admin1, admin2, market FROM dim_location", engine)
# 3. Find locations that are in our source but NOT in the database
new_locations = locations.merge(existing, on=location_cols, how='left', indicator=True)
new_locations = new_locations[new_locations['_merge'] == 'left_only'].drop('_merge', axis=1)
# 4. If there are new locations, append them
if not new_locations.empty:
new_locations.to_sql('dim_location', engine, if_exists='append', index=False)
print(f"Inserted {len(new_locations)} new location records!")
else:
print("✔️ No new location records to add.")
Use the same logic for the other dimensions!
Loading the Fact Table
We replace the descriptive text (e.g.,Beans, Nairobi) with corresponding foreign keys(location_id, commodity_id) from our dimension tables.
We manage to do this via a series of pd.merge calls, this is Pandas equivalent to SQL JOIN.
def load_fact_table(df):
print("Loading fact table...")
# Get dimension tables from the DB, now with their generated IDs
date_dim = pd.read_sql("SELECT date_id, date_value FROM dim_date", engine)
location_dim = pd.read_sql("SELECT location_id, admin1, admin2, market FROM dim_location", engine)
commodity_dim = pd.read_sql("SELECT commodity_id, category, commodity_name, unit FROM dim_commodity", engine)
# ... and so on for other dimensions
# Prep the dataframe for merging
fact_df = df.rename(columns={'commodity': 'commodity_name', 'price': 'price_kes'})
fact_df['date_value'] = pd.to_datetime(df['date']).dt.date
# Merge with dimensions to get foreign keys
fact_df = fact_df.merge(date_dim, on='date_value', how='left')
fact_df = fact_df.merge(location_dim, on=['admin1', 'admin2', 'market'], how='left')
fact_df = fact_df.merge(commodity_dim, on=['category', 'commodity_name', 'unit'], how='left')
# ... merge with other dimensions
# Select only the columns we need for the fact table
fact_columns = ['date_id', 'location_id', 'commodity_id', 'market_type_id', 'price_kes', 'price_usd']
fact_table = fact_df[fact_columns].dropna() # Drop rows where a join failed
# Load into the database (with a similar idempotency check)
fact_table.to_sql('fact_food_prices', engine, if_exists='append', index=False)
print("Fact table loaded successfully!")
Step 3: Visualizing Data with Grafana
After cleaning and structuring our data it is now ready for Grafana.
1.Connect PostgreSQL to Grafana:In Grafana, go to Connections > Add new connection, select PostgreSQL, and enter your database credentials.
2.Create a New Dashboard: Add a panel and switch to code editor and enter SQL queries.
Here are some queries to get you started.
Query 1: Price of a Commodity Over Time
Panel Type: Time Series
SELECT
d.date_value AS "time", -- Alias for Grafana's time axis
f.price_kes, -- The value to plot
c.commodity_name AS "metric" -- The name of the series
FROM
fact_food_prices AS f
JOIN
dim_date AS d ON f.date_id = d.date_id
JOIN
dim_commodity AS c ON f.commodity_id = c.commodity_id
JOIN
dim_location AS l ON f.location_id = l.location_id
WHERE
$__timeFilter(d.date_value) AND -- Use Grafana's time picker
l.market = 'Nairobi' AND
c.commodity_name = 'Maize (white)'
ORDER BY
d.date_value;
Query 2: Average Price Comparison by Commodity
Panel Type: Bar Chart
SELECT
c.commodity_name AS "Commodity",
AVG(f.price_kes) AS "Average Price (KES)"
FROM
fact_food_prices AS f
JOIN
dim_commodity AS c ON f.commodity_id = c.commodity_id
JOIN
dim_date AS d ON f.date_id = d.date_id
WHERE
$__timeFilter(d.date_value)
GROUP BY
c.commodity_name
ORDER BY
"Average Price (KES)" DESC
LIMIT 15;
We've taken raw data, applied ETL principles, modeled it for analytics, and built a interactive dashboard.
Top comments (0)