DEV Community

Cover image for Building a Modern Data Platform to Track Kenya’s Food Prices — A Data Engineering Case Study
Rose Wabere
Rose Wabere

Posted on

Building a Modern Data Platform to Track Kenya’s Food Prices — A Data Engineering Case Study

Food price volatility has always been a sensitive issue across Kenya. From urban households in Nairobi to smallholder farmers in Turkana, the ripple effects of fluctuating food prices are felt everywhere. In this article, I share my end-to-end journey designing and implementing a data platform that tracks food prices across Kenyan counties, detects inflation patterns, and visualizes trends for policymakers, consumers, and researchers.

This project is both a portfolio centerpiece and a real-world solution — demonstrating how modern data engineering practices can power impactful analytics for the public good.


Project Goals & Context

The main objective:

To design a scalable, automated pipeline that cleans, aggregates, and visualizes food price data from multiple Kenyan data sources — delivering real-time, actionable insights.

Key outcomes:

  • Build a unified data model to harmonize market, commodity, and regional data.
  • Automate the ETL process using PySpark.
  • Store processed data in PostgreSQL for analytics.
  • Build dashboards in Grafana for spatial and temporal insights.

High-Level Architecture

[Raw Data Sources]
    |-- WFP HDX Food Prices
    |-- KNBS & FAOSTAT Datasets
    |-- Government Market Surveys
        ↓
[Ingestion Layer]
    pandas 
        ↓
[Processing Layer]
    PySpark Data Cleaning + Normalization
        ↓
[Storage Layer]
    PostgreSQL (Star Schema)
        ↓
[Visualization Layer]
    Grafana Dashboards / Streamlit Web App
Enter fullscreen mode Exit fullscreen mode

This modular setup ensures scalability — you can plug in additional data sources or visualization layers (like Power BI or Superset) without refactoring the pipeline.


Data Ingestion

I used publicly available data from the World Food Programme (WFP) via the Humanitarian Data Exchange (HDX).

Data was fetched dynamically from the remote CSV URL and processed in PySpark for scalability.

Code Example: Reading the Data in PySpark

import requests
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("KenyaFoodPrices") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

# Remote CSV from HDX
csv_url = "https://data.humdata.org/dataset/e0d3fba6-f9a2-45d7-b949-140c455197ff/resource/517ee1bf-2437-4f8c-aa1b-cb9925b9d437/download/wfp_food_prices_ken.csv"

def fetch_and_load_csv():
    response = requests.get(csv_url)
    data = response.content.decode('utf-8')

    # Split into header and data
    lines = data.splitlines()
    rdd = spark.sparkContext.parallelize(lines)
    df = spark.read.csv(rdd, header=True, inferSchema=True)
    return df

# Load into Spark DataFrame
df = fetch_and_load_csv()
df.show(5)
Enter fullscreen mode Exit fullscreen mode

This approach avoids downloading files manually and scales well in scheduled jobs (for example, through Airflow or Databricks).


Data Transformation with PySpark

PySpark handled the heavy ETL logic — particularly useful for large datasets spanning multiple years and counties.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower

spark = SparkSession.builder \
    .appName("KenyaFoodPriceETL") \
    .config("spark.jars", "postgresql-42.6.0.jar") \
    .getOrCreate()

# Load data
raw_df = spark.read.csv("combined_raw.csv", header=True, inferSchema=True)

# Clean commodity and location names
clean_df = raw_df.withColumn("commodity", trim(lower(col("commodity")))) \
                 .withColumn("region", trim(lower(col("region")))) \
                 .filter(col("price").isNotNull())

# Normalize price units (example conversion)
from pyspark.sql.functions import when

clean_df = clean_df.withColumn(
    "price_per_kg",
    when(col("unit") == '90kg', col("price") / 90)
    .when(col("unit") == '1kg', col("price"))
    .otherwise(col("price"))
)
Enter fullscreen mode Exit fullscreen mode

This logic ensures consistency — converting all prices to a comparable unit (per kilogram), filtering out nulls, and standardizing categorical columns.


Data Modeling: Star Schema Design

To make the data warehouse efficient for analytics, I implemented a star schema model in PostgreSQL.

Fact Table: fact_prices

Column Description
date_id Foreign key to date_dim
location_id Foreign key to location_dim
product_id Foreign key to product_dim
price_type_id Retail/Wholesale category
price Price per KG
quantity Quantity sold

Dimension Tables:

  • product_dim: commodity, category
  • location_dim: region, district, market, latitude, longitude
  • date_dim: day, month, year
  • price_type_dim: retail/wholesale indicator

Table Creation via SQLAlchemy

Before loading the data from Spark, I pre-created the tables using SQLAlchemy — ensuring primary keys and constraints were properly set.

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, Date, ForeignKey
from dotenv import load_dotenv
import os

load_dotenv()

engine = create_engine(os.getenv("POSTGRES_URL"))
metadata = MetaData()

product_dim = Table('product_dim', metadata,
    Column('product_id', Integer, primary_key=True),
    Column('commodity', String),
    Column('category', String)
)

fact_prices = Table('fact_prices', metadata,
    Column('price_id', Integer, primary_key=True),
    Column('product_id', Integer, ForeignKey('product_dim.product_id')),
    Column('location_id', Integer, ForeignKey('location_dim.location_id')),
    Column('price', Float),
    Column('date_id', Integer, ForeignKey('date_dim.date_id'))
)

metadata.create_all(engine)
Enter fullscreen mode Exit fullscreen mode

This step ensures referential integrity and avoids schema drift.


Writing Data to PostgreSQL

Once tables were set, PySpark’s JDBC writer handled efficient parallel writes to PostgreSQL.

clean_df.write \
    .format("jdbc") \
    .option("url", os.getenv('POSTGRES_URL')) \
    .option("dbtable", "fact_prices") \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .mode("append") \
    .save()
Enter fullscreen mode Exit fullscreen mode

With proper partitioning, this approach efficiently handled millions of rows.


Visualization with Grafana

Grafana provided a clean, dynamic way to visualize the data — connected directly to PostgreSQL.

Key Dashboards:

  • Price Trend Over Time

    • Monthly price lines by commodity
    • Compare multiple items
  • Latest Prices by Commodity and Region

    • Price comparison by location
    • Can be used to detect inflation by region
  • National Average Price Dashboard

    • Avg price per commodity by latest month
    • Used for cost-of-living analysis

Grafana panels were linked to SQL queries like:

SELECT
    d.year,
    d.month,
    p.commodity,
    AVG(f.price) AS avg_price
FROM fact_prices f
JOIN date_dim d ON f.date_id = d.date_id
JOIN product_dim p ON f.product_id = p.product_id
GROUP BY d.year, d.month, p.commodity
ORDER BY d.year, d.month;
Enter fullscreen mode Exit fullscreen mode

Challenges & Lessons Learned

  1. Inconsistent Units: Prices were listed as 90kg bags, 1kg tins, or liters. Unit normalization required extensive mapping logic.
  2. Duplicate Commodity Names: Variants like “Beans (Dry)” vs “Dry Beans” — solved with fuzzy matching.
  3. Spatial Mapping: Used latitude/longitude data to create a geographic layer for Grafana maps.
  4. Performance Tuning: Spark’s .repartition() and broadcast joins improved load times.

Real-World Impact

  • Enables WFP or local governments to monitor inflation regionally
  • Helps farmers and consumers understand price dynamics
  • Open to integrating other indicators (e.g., rainfall, NDVI)
  • Ready for real-time (Kafka + Debezium integration possible)

Next Steps

  • Integrate Apache Airflow for automated ETL scheduling.
  • Use Prophet or ARIMA models to forecast inflation.
  • Add Kafka + Debezium CDC for real-time updates.
  • Expand visualizations with weather and trade data overlays.

Takeaways

This project blends engineering rigor with social impact — using open data to empower informed decisions. It demonstrates proficiency in:

  • Distributed data processing (PySpark)
  • SQL and data modeling best practices
  • Workflow orchestration (Airflow-ready design)
  • Visualization and analytics integration (Grafana)

It’s a strong addition to any data engineering portfolio — especially for roles focusing on public data, infrastructure, or analytics engineering.


Links

GitHub: github.com/Rozieroz/Kenyan-Food-Price-Tracker-and-Inflation-Analysis

Keywords: Kenya food prices, PySpark ETL, PostgreSQL data warehouse, Grafana dashboard, data engineering portfolio, Airflow pipeline, public data analytics, inflation analysis Kenya

Top comments (0)