DEV Community

Cover image for ETL Pipeline: Fetching Real-Time News Data with Python and Postgres
Gathuru_M
Gathuru_M

Posted on

ETL Pipeline: Fetching Real-Time News Data with Python and Postgres

The best way to actually understand data engineering is to build something that breaks, fix it, and watch it successfully run.

In this article, we build an ETL pipeline that pulls data from the News API, cleans it up using pandas, and loads it into a local PostgreSQL database.

If you are a beginner Python developer or just getting into data engineering, this one is for you!


The Goal & The Architecture

Before writing a single line of code, let’s look at what we are actually trying to achieve:

  1. Extract: Connect to the News API using Python, fetch the top headlines about technology, and pull down the raw data.
  2. Transform: The raw data comes back as a messy, nested JSON object. We'll use pandas to flatten it, pick the columns we actually care about, handle missing values, and format the dates.
  3. Load: Connect to a local PostgreSQL database and append our clean data into a structured table.

Step 1: Setting Up the Database

First, we need a place for our data to live. I used a PostgreSQL instance running on the cloud with Aiven.

We need a clean target table. Here is the SQL script I used to create a simple news_articles table. Notice how we have to be careful with our data types (like using TIMESTAMP for dates and TEXT for long URLs).

CREATE TABLE IF NOT EXISTS news_articles (
    id SERIAL PRIMARY KEY,
    source VARCHAR(100),
    author VARCHAR(150),
    title TEXT,
    description TEXT,
    url TEXT,
    published_at TIMESTAMP,
    extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Enter fullscreen mode Exit fullscreen mode

Below is a diagram of the news_articles table created.

A diagram of the news_articles table created

Step 2: The Python ETL Script

To keep things clean and modular, I broke the code down into three distinct functions representing E, T, and L.

Make sure you have your dependencies installed before running this:

pip install requests pandas psycopg2-binary sqlalchemy 

Enter fullscreen mode Exit fullscreen mode

Here is the full, documented script:

import requests
import pandas as pd
from datetime import datetime
import psycopg2

# Configuration
API_KEY = "YOUR_NEWS_API_KEY"
URL = f"https://newsapi.org/v2/everything?q=technology&apiKey={API_KEY}"

DB_PARAMS = {
    "host": "localhost",
    "database": "your_db_name",
    "user": "your_username",
    "password": "your_password",
    "port": "5432"
}

def extract_data():
    print("--- Starting Extraction ---")
    response = requests.get(URL)
    if response.status_code == 200:
        data = response.json()
        articles = data.get("articles", [])
        print(f"Successfully extracted {len(articles)} articles.")
        return articles
    else:
        raise Exception(f"API Request Failed with status code: {response.status_code}")

# Transforming Data
import pandas as pd

def transform_data(articles):

    # Create an empty list to store clean articles after looping
    cleaned_data = []

    # Parse through each dictionary to extract what we need
    for article in articles:
        clean_article = {
            'source' : article.get('source', {}).get('name', 'Unknown'),
            'author' : article.get('author'),
            'title' : article.get('title'),
            'description' : article.get('description'),
            'url' : article.get('url'),
            'publishedAt' : article.get('publishedAt')
        }

        cleaned_data.append(clean_article)

    df = pd.DataFrame(cleaned_data)

    # Rename column to match Postgres fields
    df = df.rename(columns={'publishedAt': 'published_at'})

    # Handle missing values  
    df['author'] = df['author'].fillna('Unknown')
    df['description'] = df['description'].fillna('No Description')

  # Format dates
    df['published_at'] = pd.to_datetime(df['published_at'])

    print("Transformation complete!")
    return df

def load_data(df):

    db_URI = os.getenv('URI')

    try:
        engine = create_engine(db_URI)

        df.to_sql(
            name = 'news_articles',
            con = engine,
            schema = 'news_api',
            if_exists = 'append',
            index = False
        )
        print("Data loaded successfully to 'news_articles'")
    except Exception as e:
        print(f"Failed to load data to the database:{e}")

# Run the pipeline
if __name__ == "__main__":
    try:
        raw_data = extract_data()
        cleaned_df = transform_data(raw_data)
        load_data(cleaned_df)
        print("ETL Pipeline Finished Successfully! 🎉")
    except Exception as e:
        print(f"Pipeline failed: {e}")

Enter fullscreen mode Exit fullscreen mode

Step 3: Running the Pipeline and Verifying the Results

When I first ran this script, I ran into a classic beginner issue: the date format coming from the API included a Z at the end (e.g., 2026-06-07T06:00:00Z), which caused my local database to complain until I used pd.to_datetime() to safely parse it.

But once those quirks were ironed out, running the script in the terminal yielded beautiful logs:

![Successful execution](https://dev-to-uploads.s3.amazonaws.com/uploads/articles/vhq6zbzsct5asbdocdcf.png)
Enter fullscreen mode Exit fullscreen mode

To verify it actually worked, I hopped back into my database client and ran a simple query:

SELECT source_name, author, title, published_at FROM news_articles LIMIT 5;

Enter fullscreen mode Exit fullscreen mode

And there it was—real live web data, organized neatly into my own database tables!

First 5 rows of extracted data from Postgres


Key Takeaways from My First Project

Building this taught me a few massive lessons that you don't get just by reading textbooks:

  • API Data is messy: You can almost never load API responses directly into a database. Nested dictionaries (like the source field in this project) require explicit flattening.
  • Idempotency matters: If I run this script twice right now, it will duplicate all the articles. As I move forward, I need to look into how to handle duplicates (like checking for existing URLs before inserting).

What's Next?

Running this manually via a Python script is great for practice, but what if I want this data updated every single morning at 6 AM while I'm asleep? I can't just sit here and click "Run" manually.

That is where Data Orchestration comes in. In my next article, we are going to look at Apache Airflow and how we can take this exact code and turn it into an automated, scheduled workflow!

Have you built an ETL pipeline before? What was the trickiest data cleaning issue you faced? Let’s chat in the comments!

Top comments (0)