DEV Community

Austin Oketch
Austin Oketch

Posted on

Data Platform for Analyzing Kenya’s Food Prices and Inflation Trends.

In this article we will be turning a raw dataset containing Kenya food prices in different regions across the country over the years.We'll achieve this via an Extraction, Transformation and Loading pipeline building a complete data pipeline for analysis.

We'll go from a CSV file to a clean, query-optimized database and a beautiful, interactive Grafana dashboard.

PROJECT ARCHITECTURE

Our data pipeline for analysis will look like this:

  • Source: A World Food Programme CSV file containing Kenya Food Prices in different Kenyan provinces and regions since 2006.
  • ETL Engine: A Python script utilizing Pandas for the Extraction, Transformation and Loading of the extracted data into designated database for analysis.
  • Data Warehouse: A PostgreSQL database modeled with a Star Schema
  • Visualization Layer: A Grafana dashboard that queries the database to get insights.

We use a Star Schema which contains a central fact table that has quantitative elements like price surrounded by dimensional tables that have descriptive attributes like location and commodity.

Step 1: Python Environment and Database Configuration

1.Setup Python Virtual Environment

Create a virtual environment and install necessary libraries.

python -m venv venv
source venv/bin/activate # For macOS/Linux
# .\venv\Scripts\activate # For Windows

# Create a requirements.txt file
touch requirements.txt
Enter fullscreen mode Exit fullscreen mode

Add the following to requirements.txt:

pandas
SQLAlchemy
psycopg2-binary
python-dotenv
Enter fullscreen mode Exit fullscreen mode

Install them:

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

2.Create the Database and Tables
In your PostgreSQL instance, create a database.

CREATE DATABASE kenya_food_prices;
Enter fullscreen mode Exit fullscreen mode

Connect to the created database and use this SQL script to set up a star schema.
UNIQUE constraints in the SQL scripts are key to prevent duplicating dimension data.

CREATE TABLE dim_date (
    date_id SERIAL PRIMARY KEY,
    date_value DATE NOT NULL UNIQUE,
    year INTEGER NOT NULL,
    month INTEGER NOT NULL,
    day INTEGER NOT NULL
);

CREATE TABLE dim_location (
    location_id SERIAL PRIMARY KEY,
    admin1 VARCHAR(255),
    admin2 VARCHAR(255),
    market VARCHAR(255),
    UNIQUE (admin1, admin2, market)
);

CREATE TABLE dim_commodity (
    commodity_id SERIAL PRIMARY KEY,
    category VARCHAR(255),
    commodity_name VARCHAR(255),
    unit VARCHAR(255),
    UNIQUE (category, commodity_name, unit)
);

CREATE TABLE dim_market_type (
    market_type_id SERIAL PRIMARY KEY,
    market_type VARCHAR(255) UNIQUE
);

CREATE TABLE fact_food_prices (
    price_id SERIAL PRIMARY KEY,
    date_id INTEGER REFERENCES dim_date(date_id),
    location_id INTEGER REFERENCES dim_location(location_id),
    commodity_id INTEGER REFERENCES dim_commodity(commodity_id),
    market_type_id INTEGER REFERENCES dim_market_type(market_type_id),
    price_kes NUMERIC(10, 2),
    price_usd NUMERIC(10, 2),
    UNIQUE (date_id, location_id, commodity_id, market_type_id)
);
Enter fullscreen mode Exit fullscreen mode

3.Storing Credentials Securely
Create a .env file in project root to safely store credentials.

DB_HOST=localhost
DB_NAME=kenya_food_prices
DB_USER=your_postgres_user
DB_PASSWORD=your_postgres_password
DB_PORT=5432
Enter fullscreen mode Exit fullscreen mode

We will use python-dotenv to load the credentials securely in our python script.

Step 2: Python ETL Script

Part 1: Extraction and Transformation
First we use Pandas to read and extract the csv.
The CSV file has an extra header row we need to skip and messy column names.

import pandas as pd
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine

# Load environment variables
load_dotenv()
db_host = os.getenv("DB_HOST")
# ... (load other db variables)
conn_string = f'postgresql://{db_user}:{db_password}@{db_host}/{db_name}'
engine = create_engine(conn_string)

def extraction_and_transformation():
    # Extraction: Read CSV, skipping the second row which is a comment
    df = pd.read_csv('wfp_food_prices_ken.csv', skiprows=[1])

    # Transformation
    df = df.dropna()
    df.columns = df.columns.str.strip() # Clean column names
    df.drop_duplicates(inplace=True)
    df.reset_index(drop=True, inplace=True)

    print("Data extracted and transformed successfully!")
    return df
Enter fullscreen mode Exit fullscreen mode

Part 2: Populating the Database

We make our script idempotent- this means it can run multiple times without creating duplicate data.

We'll start with the location dimension:

def load_location_dimension(df):
    print("Loading location dimension...")

    # 1. Select unique locations from the source data
    location_cols = ['admin1', 'admin2', 'market']
    locations = df[location_cols].drop_duplicates()

    # 2. Get existing locations from the database
    existing = pd.read_sql("SELECT admin1, admin2, market FROM dim_location", engine)

    # 3. Find locations that are in our source but NOT in the database
    new_locations = locations.merge(existing, on=location_cols, how='left', indicator=True)
    new_locations = new_locations[new_locations['_merge'] == 'left_only'].drop('_merge', axis=1)

    # 4. If there are new locations, append them
    if not new_locations.empty:
        new_locations.to_sql('dim_location', engine, if_exists='append', index=False)
        print(f"Inserted {len(new_locations)} new location records!")
    else:
        print("✔️ No new location records to add.")
Enter fullscreen mode Exit fullscreen mode

Use the same logic for the other dimensions!

Loading the Fact Table
We replace the descriptive text (e.g.,Beans, Nairobi) with corresponding foreign keys(location_id, commodity_id) from our dimension tables.

We manage to do this via a series of pd.merge calls, this is Pandas equivalent to SQL JOIN.

def load_fact_table(df):
    print("Loading fact table...")

    # Get dimension tables from the DB, now with their generated IDs
    date_dim = pd.read_sql("SELECT date_id, date_value FROM dim_date", engine)
    location_dim = pd.read_sql("SELECT location_id, admin1, admin2, market FROM dim_location", engine)
    commodity_dim = pd.read_sql("SELECT commodity_id, category, commodity_name, unit FROM dim_commodity", engine)
    # ... and so on for other dimensions

    # Prep the dataframe for merging
    fact_df = df.rename(columns={'commodity': 'commodity_name', 'price': 'price_kes'})
    fact_df['date_value'] = pd.to_datetime(df['date']).dt.date

    # Merge with dimensions to get foreign keys
    fact_df = fact_df.merge(date_dim, on='date_value', how='left')
    fact_df = fact_df.merge(location_dim, on=['admin1', 'admin2', 'market'], how='left')
    fact_df = fact_df.merge(commodity_dim, on=['category', 'commodity_name', 'unit'], how='left')
    # ... merge with other dimensions

    # Select only the columns we need for the fact table
    fact_columns = ['date_id', 'location_id', 'commodity_id', 'market_type_id', 'price_kes', 'price_usd']
    fact_table = fact_df[fact_columns].dropna() # Drop rows where a join failed

    # Load into the database (with a similar idempotency check)
    fact_table.to_sql('fact_food_prices', engine, if_exists='append', index=False)

    print("Fact table loaded successfully!")

Enter fullscreen mode Exit fullscreen mode

Step 3: Visualizing Data with Grafana

After cleaning and structuring our data it is now ready for Grafana.
1.Connect PostgreSQL to Grafana:In Grafana, go to Connections > Add new connection, select PostgreSQL, and enter your database credentials.

2.Create a New Dashboard: Add a panel and switch to code editor and enter SQL queries.

Here are some queries to get you started.

Query 1: Price of a Commodity Over Time

Panel Type: Time Series

SELECT
  d.date_value AS "time",      -- Alias for Grafana's time axis
  f.price_kes,                 -- The value to plot
  c.commodity_name AS "metric" -- The name of the series
FROM
  fact_food_prices AS f
JOIN
  dim_date AS d ON f.date_id = d.date_id
JOIN
  dim_commodity AS c ON f.commodity_id = c.commodity_id
JOIN
  dim_location AS l ON f.location_id = l.location_id
WHERE
  $__timeFilter(d.date_value) AND -- Use Grafana's time picker
  l.market = 'Nairobi' AND
  c.commodity_name = 'Maize (white)'
ORDER BY
  d.date_value;
Enter fullscreen mode Exit fullscreen mode

Query 2: Average Price Comparison by Commodity

Panel Type: Bar Chart

SELECT
  c.commodity_name AS "Commodity",
  AVG(f.price_kes) AS "Average Price (KES)"
FROM
  fact_food_prices AS f
JOIN
  dim_commodity AS c ON f.commodity_id = c.commodity_id
JOIN
  dim_date AS d ON f.date_id = d.date_id
WHERE
  $__timeFilter(d.date_value)
GROUP BY
  c.commodity_name
ORDER BY
  "Average Price (KES)" DESC
LIMIT 15;
Enter fullscreen mode Exit fullscreen mode

We've taken raw data, applied ETL principles, modeled it for analytics, and built a interactive dashboard.

Top comments (0)