Food price volatility has always been a sensitive issue across Kenya. From urban households in Nairobi to smallholder farmers in Turkana, the ripple effects of fluctuating food prices are felt everywhere. In this article, I share my end-to-end journey designing and implementing a data platform that tracks food prices across Kenyan counties, detects inflation patterns, and visualizes trends for policymakers, consumers, and researchers.
This project is both a portfolio centerpiece and a real-world solution — demonstrating how modern data engineering practices can power impactful analytics for the public good.
Project Goals & Context
The main objective:
To design a scalable, automated pipeline that cleans, aggregates, and visualizes food price data from multiple Kenyan data sources — delivering real-time, actionable insights.
Key outcomes:
- Build a unified data model to harmonize market, commodity, and regional data.
- Automate the ETL process using PySpark.
- Store processed data in PostgreSQL for analytics.
- Build dashboards in Grafana for spatial and temporal insights.
High-Level Architecture
[Raw Data Sources]
|-- WFP HDX Food Prices
|-- KNBS & FAOSTAT Datasets
|-- Government Market Surveys
↓
[Ingestion Layer]
pandas
↓
[Processing Layer]
PySpark Data Cleaning + Normalization
↓
[Storage Layer]
PostgreSQL (Star Schema)
↓
[Visualization Layer]
Grafana Dashboards / Streamlit Web App
This modular setup ensures scalability — you can plug in additional data sources or visualization layers (like Power BI or Superset) without refactoring the pipeline.
Data Ingestion
I used publicly available data from the World Food Programme (WFP) via the Humanitarian Data Exchange (HDX).
Data was fetched dynamically from the remote CSV URL and processed in PySpark for scalability.
Code Example: Reading the Data in PySpark
import requests
from pyspark.sql import SparkSession
# Initialize Spark Session
spark = SparkSession.builder \
.appName("KenyaFoodPrices") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
.getOrCreate()
# Remote CSV from HDX
csv_url = "https://data.humdata.org/dataset/e0d3fba6-f9a2-45d7-b949-140c455197ff/resource/517ee1bf-2437-4f8c-aa1b-cb9925b9d437/download/wfp_food_prices_ken.csv"
def fetch_and_load_csv():
response = requests.get(csv_url)
data = response.content.decode('utf-8')
# Split into header and data
lines = data.splitlines()
rdd = spark.sparkContext.parallelize(lines)
df = spark.read.csv(rdd, header=True, inferSchema=True)
return df
# Load into Spark DataFrame
df = fetch_and_load_csv()
df.show(5)
This approach avoids downloading files manually and scales well in scheduled jobs (for example, through Airflow or Databricks).
Data Transformation with PySpark
PySpark handled the heavy ETL logic — particularly useful for large datasets spanning multiple years and counties.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, lower
spark = SparkSession.builder \
.appName("KenyaFoodPriceETL") \
.config("spark.jars", "postgresql-42.6.0.jar") \
.getOrCreate()
# Load data
raw_df = spark.read.csv("combined_raw.csv", header=True, inferSchema=True)
# Clean commodity and location names
clean_df = raw_df.withColumn("commodity", trim(lower(col("commodity")))) \
.withColumn("region", trim(lower(col("region")))) \
.filter(col("price").isNotNull())
# Normalize price units (example conversion)
from pyspark.sql.functions import when
clean_df = clean_df.withColumn(
"price_per_kg",
when(col("unit") == '90kg', col("price") / 90)
.when(col("unit") == '1kg', col("price"))
.otherwise(col("price"))
)
This logic ensures consistency — converting all prices to a comparable unit (per kilogram), filtering out nulls, and standardizing categorical columns.
Data Modeling: Star Schema Design
To make the data warehouse efficient for analytics, I implemented a star schema model in PostgreSQL.
Fact Table: fact_prices
| Column | Description |
|---|---|
| date_id | Foreign key to date_dim
|
| location_id | Foreign key to location_dim
|
| product_id | Foreign key to product_dim
|
| price_type_id | Retail/Wholesale category |
| price | Price per KG |
| quantity | Quantity sold |
Dimension Tables:
-
product_dim: commodity, category -
location_dim: region, district, market, latitude, longitude -
date_dim: day, month, year -
price_type_dim: retail/wholesale indicator
Table Creation via SQLAlchemy
Before loading the data from Spark, I pre-created the tables using SQLAlchemy — ensuring primary keys and constraints were properly set.
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, Date, ForeignKey
from dotenv import load_dotenv
import os
load_dotenv()
engine = create_engine(os.getenv("POSTGRES_URL"))
metadata = MetaData()
product_dim = Table('product_dim', metadata,
Column('product_id', Integer, primary_key=True),
Column('commodity', String),
Column('category', String)
)
fact_prices = Table('fact_prices', metadata,
Column('price_id', Integer, primary_key=True),
Column('product_id', Integer, ForeignKey('product_dim.product_id')),
Column('location_id', Integer, ForeignKey('location_dim.location_id')),
Column('price', Float),
Column('date_id', Integer, ForeignKey('date_dim.date_id'))
)
metadata.create_all(engine)
This step ensures referential integrity and avoids schema drift.
Writing Data to PostgreSQL
Once tables were set, PySpark’s JDBC writer handled efficient parallel writes to PostgreSQL.
clean_df.write \
.format("jdbc") \
.option("url", os.getenv('POSTGRES_URL')) \
.option("dbtable", "fact_prices") \
.option("user", os.getenv('POSTGRES_USER')) \
.option("password", os.getenv('POSTGRES_PASSWORD')) \
.mode("append") \
.save()
With proper partitioning, this approach efficiently handled millions of rows.
Visualization with Grafana
Grafana provided a clean, dynamic way to visualize the data — connected directly to PostgreSQL.
Key Dashboards:
-
Price Trend Over Time
- Monthly price lines by commodity
- Compare multiple items
-
Latest Prices by Commodity and Region
- Price comparison by location
- Can be used to detect inflation by region
-
National Average Price Dashboard
- Avg price per commodity by latest month
- Used for cost-of-living analysis
Grafana panels were linked to SQL queries like:
SELECT
d.year,
d.month,
p.commodity,
AVG(f.price) AS avg_price
FROM fact_prices f
JOIN date_dim d ON f.date_id = d.date_id
JOIN product_dim p ON f.product_id = p.product_id
GROUP BY d.year, d.month, p.commodity
ORDER BY d.year, d.month;
Challenges & Lessons Learned
- Inconsistent Units: Prices were listed as 90kg bags, 1kg tins, or liters. Unit normalization required extensive mapping logic.
- Duplicate Commodity Names: Variants like “Beans (Dry)” vs “Dry Beans” — solved with fuzzy matching.
- Spatial Mapping: Used latitude/longitude data to create a geographic layer for Grafana maps.
-
Performance Tuning: Spark’s
.repartition()and broadcast joins improved load times.
Real-World Impact
- Enables WFP or local governments to monitor inflation regionally
- Helps farmers and consumers understand price dynamics
- Open to integrating other indicators (e.g., rainfall, NDVI)
- Ready for real-time (Kafka + Debezium integration possible)
Next Steps
- Integrate Apache Airflow for automated ETL scheduling.
- Use Prophet or ARIMA models to forecast inflation.
- Add Kafka + Debezium CDC for real-time updates.
- Expand visualizations with weather and trade data overlays.
Takeaways
This project blends engineering rigor with social impact — using open data to empower informed decisions. It demonstrates proficiency in:
- Distributed data processing (PySpark)
- SQL and data modeling best practices
- Workflow orchestration (Airflow-ready design)
- Visualization and analytics integration (Grafana)
It’s a strong addition to any data engineering portfolio — especially for roles focusing on public data, infrastructure, or analytics engineering.
Links
GitHub: github.com/Rozieroz/Kenyan-Food-Price-Tracker-and-Inflation-Analysis
Keywords: Kenya food prices, PySpark ETL, PostgreSQL data warehouse, Grafana dashboard, data engineering portfolio, Airflow pipeline, public data analytics, inflation analysis Kenya

Top comments (0)