DEV Community

Cover image for Using Data Engineering to Track Food Prices and Inflation in Kenya from 2006 to 2025
Denzel Kanyeki
Denzel Kanyeki

Posted on • Edited on

Using Data Engineering to Track Food Prices and Inflation in Kenya from 2006 to 2025

Rising food prices continue to be a critical issue affecting many households in Kenya. As a data engineer passionate about impactful projects, I built a data pipeline to collect, model, and analyze food price and inflation trends in Kenya, shedding light on various patterns from 2006 to 2024.

By building this pipeline, we can:

  • Monitor inflation’s impact on staple food prices in various areas.

  • Provide data-backed insights to policy makers, NGOs, and supply chain analysts.

  • Improve food security planning through trend forecasting.

Project Objectives

To build a scalable data pipeline and star-schema data warehouse to analyze historical food prices across Kenyan markets and derive insights into inflation trends, particularly for food categories such as vegetables, fruits, meat and eggs, milk and dairy among others.

The Github repository for this project can be accessed here

Tech Stack Used

  • Data Source, Food Prices: Humanitarian Data Exchange (HDX)
  • Data Source, Inflation: World Bank Indicator API v2
  • Pipeline: Python + pandas for extraction & transformation
  • Database: PostgreSQL with dimensional modeling
  • Orchestration and Automation: Apache Airflow
  • Visualization: Grafana
  • Slideshow presentation: MS Powerpoint

Project Architecture

The flowchart below shows the project architecture flow from start to finish, with the tech stack used in every step.

Project Architecture Diagram

Database Architecture

The database design is in form of a star schema, having one central fact table and different dimension tables. The fact table contains the primary keys that map to a specific dimension table and measurements. Dimension tables contain the attributes or the units of analysis of the data. For more information on different schemas, visit this blog to learn more.

The database structure diagram below shows how the schema is set up.

Database Diagram

Pipeline Overview

This project follows a batch data pipeline that extracts food price and inflation data from APIs, transforms it for consistency, and loads it into a PostgreSQL database for analysis and downstream use cases.

a. Data Extraction.

  • Monthly food price and inflation data is downloaded in CSV format from the relevant APIs.

  • Only relevant data are retained such as geodata (latitude, longitude), commodities, dates e.t.c.

Below is an example of a code snippet to extract data from the HDX Kenya data.

def extract_data():
    url = "https://data.humdata.org/dataset/e0d3fba6-f9a2-45d7-b949-140c455197ff/resource/517ee1bf-2437-4f8c-aa1b-cb9925b9d437/download/wfp_food_prices_ken.csv"

    response = requests.get(url)

    if response.status_code == 200:
        df = pd.read_csv(StringIO(response.text))
        return df
    else:
        print(f"Error loading data from CSV file: {response.status_code}, {response.text}")
Enter fullscreen mode Exit fullscreen mode

b. Data Transformation

Below are some of the transformations used to transform data:

  • Clean inconsistent units and datatypes, e.g. kgs, liters, floats.

  • Normalize market names and commodity labels.

  • Fill in missing dates and reformat for time-series alignment.

  • Filter out years with no data e.g., 2006–2018 for the vegetables category.

Below is a code snippet for transforming food data:

def transform_data(df):
    try:
        df.drop(0, axis=0, inplace=True)
        df["date"] = pd.to_datetime(df["date"])

        df["latitude"] = df["latitude"].astype(float)
        df["longitude"] = df["longitude"].astype(float)
        df["price"] = df["price"].astype(float)
        df["usdprice"] = df["usdprice"].astype(float)
        df["market_id"] = df["market_id"].astype(int)
        df["commodity_id"] = df["commodity_id"].astype(int)

        df.rename(columns={'admin1': 'province', 'admin2': 'county'}, inplace=True)

        df.dropna(axis=0, how='any', inplace=True)
        return df
    except Exception as e:
        print(f"Error loading dataframe from previous task: {e}")
Enter fullscreen mode Exit fullscreen mode

c. Loading data into a PostgreSQL database

After transformation, our data is ready for loading into the database. The dimension tables are loaded first, then the fact table is loaded for consistency.

Below is a code snippet for loading market data into the market dimension table.

def loading_market_data(clean_df):
    try:
        market_df = clean_df[['market_id', 'province', 'county', 'latitude', 'longitude']].drop_duplicates()
        market_df.dropna(axis=0, how='any', inplace=True)
        engine = create_engine(os.getenv("PSQL_URI"))
        try:
            market_df.to_sql('dim_market', con=engine, index=False, schema='foodprices', if_exists='append')
            print("Data loaded into PostgreSQL successfully")
        except Exception as e:
            print(f"Error loading into dim_market table: {e}")
    except Exception as e:
        print(f"Error loading clean dataframe from previous task: {e}")
Enter fullscreen mode Exit fullscreen mode

d. Workflow Automation using Airflow

  • Airflow DAGs are used to schedule the pipeline to run monthly.

  • Each task is logged, and failure alerts are handled through retries and error logging.

  • Logs are also tracked to ensure data lineage in the pipeline.

  • Airflow is then moved to production on an Azure Virtual Machine.

Below is an example of a DAG used to automate the Inflation data pipeline

@dag(dag_id='inflation_pipeline_dag', default_args=default_args, start_date=datetime(2025, 7, 19), schedule_interval='@monthly', catchup=False)
def inflation_pipeline_dag():
    @task
    def extract_data():
        url = "https://api.worldbank.org/v2/country/ke/indicator/FP.CPI.TOTL?format=json&date=2006:2024"

        inflation_data = []
        response = requests.get(url)
        if response.status_code == 200:
            cpi_data = response.json()
            for record in cpi_data[1]:
                inflation_data.append(
                    {
                        'year': record['date'],
                        'value': record['value'],
                        'indicator': record['indicator']['value']
                    }
                )
        else:
            print(f"Error extracting CPI data: {response.status_code}, {response.text}")

        df = pd.DataFrame(inflation_data)
        return df

    @task
    def transform_data(df):
        df['year'] = pd.to_datetime(df['year'], format='%Y')
        return df

    @task
    def load_inflation_data(clean_df):
        engine = create_engine(os.getenv("PSQL_URI"))
        try:
            clean_df.to_sql('inflation_data', con=engine, schema='inflation', index=False, if_exists='append')
            print("Data loaded into inflation_data successfully!")
        except Exception as e:
            print(f"Error loading into inflation_data: {e}")

    @task
    def send_email():
        body = f"""
                Inflation pipeline DAG has run successfully, please check it out
                """
        subject = "Inflation Pipeline DAG Notification"
        sender = os.getenv("SENDER")
        receipients = [os.getenv('RECEIPIENT')]
        pwd = os.getenv("PASSWORD")

        msg = MIMEText(body)
        msg["Subject"] = subject
        msg["From"] = sender
        msg["To"] = receipients[0]

        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp_server:
            try:
                smtp_server.login(sender, pwd)
                smtp_server.sendmail(sender, receipients, msg.as_string())
                print(f"Email sent successfully!")
            except Exception as e:
                print(f"Sending email to receipient error: {e}")

    df = extract_data()
    clean_df = transform_data(df)
    loading = load_inflation_data(clean_df)
    email_sent = send_email()

    loading >> email_sent

inflation = inflation_pipeline_dag()
Enter fullscreen mode Exit fullscreen mode

e. Visualization and analysis

  • Transformed data is analyzed on Grafana.

  • Trends like monthly price inflation, food price difference per county, and average price change of different food categories over time are tracked.

Below is a snapshot of the Grafana dashboard. To access the dashboard, follow this link

INSERT DASHBOARD HERE!

Insights

For most food categories, prices hiked around the start of 2022 till the end of 2023. This might have been brought about by various factors which include:

  • Currency depreciation: As the inflation chart shows, the inflation in Kenya quadrupled between 2020 and 2023, and this might have led to the hike in food prices around 2022
  • Drought and other climatic conditions: In 2022, Kenya experienced below-average rainfall, particularly in the northeastern regions, which suffered from prolonged drought conditions. The year was also marked by significant temperature increases, making it one of the warmest on record. This might lead to low food production and increase in food prices, especially in arid and semi-arid areas.
  • High Input costs: The Russian-Ukrainian war negatively impacted some industries e.g. the fuel industry which is a key input factor in the agricultural sector as food stuff need to be transported from the farm to other areas, hence leading to an increase in food prices.

Challenges faced during building this pipeline

a. Missing data

Some food categories and commodities missed data from certain time periods, for example, for the vegetables and fruits category, there was missing data from 2006 to 2018.

b. Data Normalization

Inconsistent data had to be normalized into consistent units of mass, price e.t.c.
Some commodities had units in kilograms and 50 kilogram bags, hence need to normalize this data for easier analysis.

Lack of data sources

There was a lack of data sources for Kenyan counties, and other commodities, hence only analyzing the available data on commodities

Conclusion

This project was a deep dive into how data engineering can be used for social impact. If you're working on something similar or want to collaborate, feel free to connect!

Also, like, comment and follow me for more data engineering content.

Top comments (0)