DEV Community

Cover image for Python Meets SQL: Pandas and Databases Together
Akhilesh
Akhilesh

Posted on

Python Meets SQL: Pandas and Databases Together

Every skill you have built across four phases comes together here.

Python for logic and transformation. Pandas for analysis. SQL for querying databases. They are not separate tools. In real data work they form a single pipeline. SQL pulls data from the database efficiently. Pandas transforms and analyzes it in memory. Python orchestrates the whole thing.

This post is about connecting them properly, doing it safely, and building the patterns you will use in every data project going forward.


Three Ways to Connect Python to a Database

SQLite lives inside Python's standard library. No installation. No server. The database is a single file. Perfect for learning, prototyping, and small applications.

import sqlite3
conn = sqlite3.connect("mydata.db")        # file on disk
conn = sqlite3.connect(":memory:")         # lives in RAM, gone when script ends
Enter fullscreen mode Exit fullscreen mode

SQLAlchemy is the universal database connector for Python. One interface, every database. PostgreSQL, MySQL, SQLite, Oracle, SQL Server, all use the same API.

pip install sqlalchemy psycopg2-binary     # for PostgreSQL
pip install sqlalchemy pymysql             # for MySQL
Enter fullscreen mode Exit fullscreen mode
from sqlalchemy import create_engine

sqlite_engine   = create_engine("sqlite:///mydata.db")
postgres_engine = create_engine("postgresql://user:password@localhost:5432/dbname")
mysql_engine    = create_engine("mysql+pymysql://user:password@localhost:3306/dbname")
Enter fullscreen mode Exit fullscreen mode

pandas.read_sql does not care which database you use as long as you pass a valid connection. This is the function that bridges SQL queries directly into DataFrames.


The Full Setup

import sqlite3
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text

engine = create_engine("sqlite:///analytics.db")

with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS customers (
            customer_id  INTEGER PRIMARY KEY,
            name         TEXT,
            city         TEXT,
            segment      TEXT,
            created_date TEXT
        )
    """))

    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS transactions (
            txn_id       INTEGER PRIMARY KEY,
            customer_id  INTEGER,
            amount       REAL,
            category     TEXT,
            txn_date     TEXT,
            status       TEXT
        )
    """))
    conn.commit()

customers_data = pd.DataFrame({
    "customer_id":  range(1, 9),
    "name":         ["Alex", "Priya", "Sam", "Jordan", "Lisa", "Ravi", "Tom", "Nina"],
    "city":         ["Mumbai", "Delhi", "Bangalore", "Mumbai", "Chennai", "Delhi", "Bangalore", "Mumbai"],
    "segment":      ["Premium", "Standard", "Standard", "Premium", "Standard", "Premium", "Standard", "Premium"],
    "created_date": ["2022-01-10", "2021-06-15", "2023-03-01", "2020-11-22",
                     "2022-08-30", "2021-02-14", "2023-07-05", "2021-09-18"]
})

np.random.seed(42)
n = 50
transactions_data = pd.DataFrame({
    "txn_id":      range(1, n + 1),
    "customer_id": np.random.randint(1, 9, n),
    "amount":      np.random.uniform(500, 80000, n).round(0),
    "category":    np.random.choice(["Electronics", "Clothing", "Food", "Travel"], n),
    "txn_date":    pd.date_range("2024-01-01", periods=n, freq="3D").strftime("%Y-%m-%d"),
    "status":      np.random.choice(["completed", "completed", "completed", "cancelled"], n)
})

customers_data.to_sql("customers", engine, if_exists="replace", index=False)
transactions_data.to_sql("transactions", engine, if_exists="replace", index=False)

print("Data loaded successfully.")
print(f"Customers: {len(customers_data)} rows")
print(f"Transactions: {len(transactions_data)} rows")
Enter fullscreen mode Exit fullscreen mode

to_sql writes a DataFrame directly to a database table. if_exists="replace" drops and recreates the table. if_exists="append" adds rows to an existing table. if_exists="fail" raises an error if the table exists.

index=False prevents Pandas from writing the DataFrame index as an extra column. Almost always what you want.


Reading Data: pd.read_sql_query

df = pd.read_sql_query(
    "SELECT * FROM customers",
    engine
)
print(df)
Enter fullscreen mode Exit fullscreen mode

Simple. Pass any SQL string and a connection or engine. Get a DataFrame back.

Parameterized queries with SQLAlchemy:

city = "Mumbai"

df = pd.read_sql_query(
    text("SELECT * FROM customers WHERE city = :city"),
    engine,
    params={"city": city}
)
print(df)
Enter fullscreen mode Exit fullscreen mode

Never use f-strings to build SQL queries with user input. This is how SQL injection attacks happen. Always use parameterized queries when values come from outside your code.

# WRONG - SQL injection risk
city = "Mumbai"
df = pd.read_sql_query(f"SELECT * FROM customers WHERE city = '{city}'", engine)

# CORRECT - parameterized
df = pd.read_sql_query(
    text("SELECT * FROM customers WHERE city = :city"),
    engine,
    params={"city": city}
)
Enter fullscreen mode Exit fullscreen mode

The Pipeline Pattern: SQL Filters, Pandas Transforms

The most common real-world pattern. Use SQL to pull only what you need. Use Pandas for everything else.

df = pd.read_sql_query("""
    SELECT
        c.customer_id,
        c.name,
        c.city,
        c.segment,
        COUNT(t.txn_id)       AS total_transactions,
        SUM(t.amount)         AS total_spent,
        AVG(t.amount)         AS avg_transaction,
        MAX(t.txn_date)       AS last_transaction
    FROM customers c
    LEFT JOIN transactions t
        ON c.customer_id = t.customer_id
        AND t.status = 'completed'
    GROUP BY c.customer_id, c.name, c.city, c.segment
    ORDER BY total_spent DESC NULLS LAST
""", engine)

df["last_transaction"] = pd.to_datetime(df["last_transaction"])
df["days_since_last"]  = (pd.Timestamp.now() - df["last_transaction"]).dt.days
df["customer_value"]   = pd.cut(
    df["total_spent"].fillna(0),
    bins=[0, 10000, 50000, float("inf")],
    labels=["Low", "Medium", "High"]
)

print(df[["name", "segment", "total_spent", "avg_transaction", "days_since_last", "customer_value"]])
Enter fullscreen mode Exit fullscreen mode

Output:

    name   segment  total_spent  avg_transaction  days_since_last customer_value
0   Alex   Premium     238500.0         29812.5             24.0           High
1  Priya  Standard     198000.0         28285.7             15.0           High
...
Enter fullscreen mode Exit fullscreen mode

SQL handled the aggregation across 50 rows of transaction data before it even touched Python. Pandas then added date calculations and customer value tiers that SQL could not do as cleanly.


Writing Results Back to the Database

Analysis results that others need should live in the database, not in your laptop's memory.

summary = df[["customer_id", "total_spent", "total_transactions", "customer_value", "days_since_last"]].copy()
summary["calculated_at"] = pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")

summary.to_sql(
    "customer_summary",
    engine,
    if_exists="replace",
    index=False
)

verification = pd.read_sql_query("SELECT * FROM customer_summary", engine)
print(f"Summary table written: {len(verification)} rows")
print(verification.head(3))
Enter fullscreen mode Exit fullscreen mode

The summary is now queryable by anyone with database access. A dashboard tool can read it. Another analyst can join it. An application can serve it to users.


Chunked Reading for Large Tables

When a table has millions of rows, loading it all into memory crashes things. Read it in chunks.

chunk_size = 1000
total_revenue = 0
row_count = 0

for chunk in pd.read_sql_query(
    "SELECT amount, status FROM transactions",
    engine,
    chunksize=chunk_size
):
    completed = chunk[chunk["status"] == "completed"]
    total_revenue += completed["amount"].sum()
    row_count += len(completed)

print(f"Processed {row_count} completed transactions")
print(f"Total revenue: {total_revenue:,.0f}")
Enter fullscreen mode Exit fullscreen mode

Each chunk is a DataFrame of chunk_size rows. Process it, extract what you need, let it get garbage-collected. You never hold more than one chunk in memory at once.


Context Managers and Connection Safety

Database connections are resources. Always close them properly. Context managers guarantee this.

with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM transactions"))
    count = result.scalar()
    print(f"Transaction count: {count}")

with sqlite3.connect("analytics.db") as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM customers")
    count = cursor.fetchone()[0]
    print(f"Customer count: {count}")
Enter fullscreen mode Exit fullscreen mode

When the with block ends, the connection closes automatically. Even if an exception is raised inside the block. Never rely on garbage collection to close database connections. Always use context managers.


Transactions: All or Nothing

When you write multiple related records, they should either all succeed or all fail together. A transaction guarantees this.

def add_customer_with_transaction(engine, customer_data, initial_transaction):
    with engine.begin() as conn:
        conn.execute(
            text("""
                INSERT INTO customers (customer_id, name, city, segment, created_date)
                VALUES (:id, :name, :city, :segment, :date)
            """),
            {"id": customer_data["id"], "name": customer_data["name"],
             "city": customer_data["city"], "segment": customer_data["segment"],
             "date": customer_data["date"]}
        )

        conn.execute(
            text("""
                INSERT INTO transactions (txn_id, customer_id, amount, category, txn_date, status)
                VALUES (:txn_id, :customer_id, :amount, :category, :date, 'completed')
            """),
            {"txn_id": initial_transaction["txn_id"],
             "customer_id": customer_data["id"],
             "amount": initial_transaction["amount"],
             "category": initial_transaction["category"],
             "date": initial_transaction["date"]}
        )

    print(f"Customer {customer_data['name']} and first transaction added successfully.")

new_customer = {"id": 9, "name": "Oscar", "city": "Pune",
                "segment": "Standard", "date": "2024-03-01"}
first_txn    = {"txn_id": 51, "amount": 15000, "category": "Electronics", "date": "2024-03-01"}

add_customer_with_transaction(engine, new_customer, first_txn)
Enter fullscreen mode Exit fullscreen mode

engine.begin() starts a transaction. If any statement inside raises an exception, all changes are rolled back. The customer and the transaction are inserted together or not at all. No orphaned records.


A Complete Data Pipeline

Putting everything together. Extract from database, transform in Pandas, load back.

def run_etl_pipeline(engine):
    print("Step 1: Extract")
    raw = pd.read_sql_query("""
        SELECT
            t.txn_id, t.customer_id, t.amount, t.category,
            t.txn_date, t.status, c.city, c.segment
        FROM transactions t
        JOIN customers c ON t.customer_id = c.customer_id
        WHERE t.status = 'completed'
    """, engine)
    print(f"  Extracted {len(raw)} rows")

    print("Step 2: Transform")
    raw["txn_date"]  = pd.to_datetime(raw["txn_date"])
    raw["month"]     = raw["txn_date"].dt.to_period("M").astype(str)
    raw["amount_usd"] = (raw["amount"] / 83).round(2)

    monthly_summary = raw.groupby(["month", "category", "segment"]).agg(
        transactions=("txn_id", "count"),
        revenue=("amount", "sum"),
        avg_value=("amount", "mean")
    ).reset_index()
    monthly_summary["avg_value"] = monthly_summary["avg_value"].round(0)
    print(f"  Transformed to {len(monthly_summary)} summary rows")

    print("Step 3: Load")
    monthly_summary.to_sql("monthly_summary", engine, if_exists="replace", index=False)
    print("  Loaded to monthly_summary table")

    result = pd.read_sql_query(
        "SELECT * FROM monthly_summary ORDER BY month, revenue DESC",
        engine
    )
    print("\nFinal output:")
    print(result.head(10))

run_etl_pipeline(engine)
Enter fullscreen mode Exit fullscreen mode

Extract, transform, load. Every data pipeline you will ever build follows this shape. The tools change. The databases change. The pattern does not.


A Resource Worth Knowing

Robin Moffatt wrote a widely-read piece called "Kafka, JDBC and the Problem of Schema Evolution" but his more practical content lives on the Confluent blog where he writes about database connectivity patterns in Python for real production systems. His writing on SQLAlchemy connection pooling and transaction management is some of the most practical content available. Search "Robin Moffatt SQLAlchemy Python production."

For SQLAlchemy specifically, the documentation at docs.sqlalchemy.org/en/14/core/tutorial.html is unusually good. The "Core Tutorial" section reads like a well-written blog post, not a reference manual. Worth reading alongside this post.


Try This

Create python_sql_practice.py.

Build a complete mini analytics system on the database from this post.

Part 1: Segmentation pipeline. Query all completed transactions. In Pandas, calculate each customer's total spending, transaction count, average transaction value, and days since last purchase. Write the result back to a table called customer_rfm.

Part 2: Category report. Using a single SQL query (no Pandas processing), get revenue by category and month. Use a window function to add a column showing each category's percentage of total monthly revenue. Load into a DataFrame and print.

Part 3: Anomaly detection. Read the transactions table. In Pandas, calculate z-scores for the amount column. Flag any transaction more than 2 standard deviations from the mean as an anomaly. Write these flagged transactions to a table called transaction_anomalies.

Part 4: Pipeline function. Wrap all three parts into a single run_analysis(engine) function. It should print progress messages, handle exceptions with try/except, and print a final summary showing how many rows were written to each output table.


Phase 4 Complete

Five posts. From basic SELECT statements to joining three tables, subqueries, CTEs, window functions, and full Python-SQL integration.

SQL is not a separate subject from data science. It is the first tool you use in almost every analysis. The data lives in a database. SQL is how you get it out.

Phase 5 next. Three posts covering Git, GitHub, Jupyter, and Google Colab. Short phase. High leverage. These are the tools that make your work reproducible, shareable, and professional.

Top comments (0)