Rising food prices continue to be a critical issue affecting many households in Kenya. As a data engineer passionate about impactful projects, I built a data pipeline to collect, model, and analyze food price and inflation trends in Kenya, shedding light on various patterns from 2006 to 2024.
By building this pipeline, we can:
Monitor inflation’s impact on staple food prices in various areas.
Provide data-backed insights to policy makers, NGOs, and supply chain analysts.
Improve food security planning through trend forecasting.
Project Objectives
To build a scalable data pipeline and star-schema data warehouse to analyze historical food prices across Kenyan markets and derive insights into inflation trends, particularly for food categories such as vegetables, fruits, meat and eggs, milk and dairy among others.
The Github repository for this project can be accessed here
Tech Stack Used
- Data Source, Food Prices: Humanitarian Data Exchange (HDX)
- Data Source, Inflation: World Bank Indicator API v2
-
Pipeline: Python +
pandas
for extraction & transformation - Database: PostgreSQL with dimensional modeling
- Orchestration and Automation: Apache Airflow
- Visualization: Grafana
- Slideshow presentation: MS Powerpoint
Project Architecture
The flowchart below shows the project architecture flow from start to finish, with the tech stack used in every step.
Database Architecture
The database design is in form of a star schema, having one central fact table and different dimension tables. The fact table contains the primary keys that map to a specific dimension table and measurements. Dimension tables contain the attributes or the units of analysis of the data. For more information on different schemas, visit this blog to learn more.
The database structure diagram below shows how the schema is set up.
Pipeline Overview
This project follows a batch data pipeline that extracts food price and inflation data from APIs, transforms it for consistency, and loads it into a PostgreSQL database for analysis and downstream use cases.
a. Data Extraction.
Monthly food price and inflation data is downloaded in CSV format from the relevant APIs.
Only relevant data are retained such as geodata (latitude, longitude), commodities, dates e.t.c.
Below is an example of a code snippet to extract data from the HDX Kenya data.
def extract_data():
url = "https://data.humdata.org/dataset/e0d3fba6-f9a2-45d7-b949-140c455197ff/resource/517ee1bf-2437-4f8c-aa1b-cb9925b9d437/download/wfp_food_prices_ken.csv"
response = requests.get(url)
if response.status_code == 200:
df = pd.read_csv(StringIO(response.text))
return df
else:
print(f"Error loading data from CSV file: {response.status_code}, {response.text}")
b. Data Transformation
Below are some of the transformations used to transform data:
Clean inconsistent units and datatypes, e.g. kgs, liters, floats.
Normalize market names and commodity labels.
Fill in missing dates and reformat for time-series alignment.
Filter out years with no data e.g., 2006–2018 for the vegetables category.
Below is a code snippet for transforming food data:
def transform_data(df):
try:
df.drop(0, axis=0, inplace=True)
df["date"] = pd.to_datetime(df["date"])
df["latitude"] = df["latitude"].astype(float)
df["longitude"] = df["longitude"].astype(float)
df["price"] = df["price"].astype(float)
df["usdprice"] = df["usdprice"].astype(float)
df["market_id"] = df["market_id"].astype(int)
df["commodity_id"] = df["commodity_id"].astype(int)
df.rename(columns={'admin1': 'province', 'admin2': 'county'}, inplace=True)
df.dropna(axis=0, how='any', inplace=True)
return df
except Exception as e:
print(f"Error loading dataframe from previous task: {e}")
c. Loading data into a PostgreSQL database
After transformation, our data is ready for loading into the database. The dimension tables are loaded first, then the fact table is loaded for consistency.
Below is a code snippet for loading market data into the market dimension table.
def loading_market_data(clean_df):
try:
market_df = clean_df[['market_id', 'province', 'county', 'latitude', 'longitude']].drop_duplicates()
market_df.dropna(axis=0, how='any', inplace=True)
engine = create_engine(os.getenv("PSQL_URI"))
try:
market_df.to_sql('dim_market', con=engine, index=False, schema='foodprices', if_exists='append')
print("Data loaded into PostgreSQL successfully")
except Exception as e:
print(f"Error loading into dim_market table: {e}")
except Exception as e:
print(f"Error loading clean dataframe from previous task: {e}")
d. Workflow Automation using Airflow
Airflow DAGs are used to schedule the pipeline to run monthly.
Each task is logged, and failure alerts are handled through retries and error logging.
Logs are also tracked to ensure data lineage in the pipeline.
Airflow is then moved to production on an Azure Virtual Machine.
Below is an example of a DAG used to automate the Inflation data pipeline
@dag(dag_id='inflation_pipeline_dag', default_args=default_args, start_date=datetime(2025, 7, 19), schedule_interval='@monthly', catchup=False)
def inflation_pipeline_dag():
@task
def extract_data():
url = "https://api.worldbank.org/v2/country/ke/indicator/FP.CPI.TOTL?format=json&date=2006:2024"
inflation_data = []
response = requests.get(url)
if response.status_code == 200:
cpi_data = response.json()
for record in cpi_data[1]:
inflation_data.append(
{
'year': record['date'],
'value': record['value'],
'indicator': record['indicator']['value']
}
)
else:
print(f"Error extracting CPI data: {response.status_code}, {response.text}")
df = pd.DataFrame(inflation_data)
return df
@task
def transform_data(df):
df['year'] = pd.to_datetime(df['year'], format='%Y')
return df
@task
def load_inflation_data(clean_df):
engine = create_engine(os.getenv("PSQL_URI"))
try:
clean_df.to_sql('inflation_data', con=engine, schema='inflation', index=False, if_exists='append')
print("Data loaded into inflation_data successfully!")
except Exception as e:
print(f"Error loading into inflation_data: {e}")
@task
def send_email():
body = f"""
Inflation pipeline DAG has run successfully, please check it out
"""
subject = "Inflation Pipeline DAG Notification"
sender = os.getenv("SENDER")
receipients = [os.getenv('RECEIPIENT')]
pwd = os.getenv("PASSWORD")
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = sender
msg["To"] = receipients[0]
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server:
try:
smtp_server.login(sender, pwd)
smtp_server.sendmail(sender, receipients, msg.as_string())
print(f"Email sent successfully!")
except Exception as e:
print(f"Sending email to receipient error: {e}")
df = extract_data()
clean_df = transform_data(df)
loading = load_inflation_data(clean_df)
email_sent = send_email()
loading >> email_sent
inflation = inflation_pipeline_dag()
e. Visualization and analysis
Transformed data is analyzed on Grafana.
Trends like monthly price inflation, food price difference per county, and average price change of different food categories over time are tracked.
Below is a snapshot of the Grafana dashboard. To access the dashboard, follow this link
INSERT DASHBOARD HERE!
Insights
For most food categories, prices hiked around the start of 2022 till the end of 2023. This might have been brought about by various factors which include:
- Currency depreciation: As the inflation chart shows, the inflation in Kenya quadrupled between 2020 and 2023, and this might have led to the hike in food prices around 2022
- Drought and other climatic conditions: In 2022, Kenya experienced below-average rainfall, particularly in the northeastern regions, which suffered from prolonged drought conditions. The year was also marked by significant temperature increases, making it one of the warmest on record. This might lead to low food production and increase in food prices, especially in arid and semi-arid areas.
- High Input costs: The Russian-Ukrainian war negatively impacted some industries e.g. the fuel industry which is a key input factor in the agricultural sector as food stuff need to be transported from the farm to other areas, hence leading to an increase in food prices.
Challenges faced during building this pipeline
a. Missing data
Some food categories and commodities missed data from certain time periods, for example, for the vegetables and fruits category, there was missing data from 2006 to 2018.
b. Data Normalization
Inconsistent data had to be normalized into consistent units of mass, price e.t.c.
Some commodities had units in kilograms and 50 kilogram bags, hence need to normalize this data for easier analysis.
Lack of data sources
There was a lack of data sources for Kenyan counties, and other commodities, hence only analyzing the available data on commodities
Conclusion
This project was a deep dive into how data engineering can be used for social impact. If you're working on something similar or want to collaborate, feel free to connect!
Also, like, comment and follow me for more data engineering content.
Top comments (0)