Every skill you have built across four phases comes together here.
Python for logic and transformation. Pandas for analysis. SQL for querying databases. They are not separate tools. In real data work they form a single pipeline. SQL pulls data from the database efficiently. Pandas transforms and analyzes it in memory. Python orchestrates the whole thing.
This post is about connecting them properly, doing it safely, and building the patterns you will use in every data project going forward.
Three Ways to Connect Python to a Database
SQLite lives inside Python's standard library. No installation. No server. The database is a single file. Perfect for learning, prototyping, and small applications.
import sqlite3
conn = sqlite3.connect("mydata.db") # file on disk
conn = sqlite3.connect(":memory:") # lives in RAM, gone when script ends
SQLAlchemy is the universal database connector for Python. One interface, every database. PostgreSQL, MySQL, SQLite, Oracle, SQL Server, all use the same API.
pip install sqlalchemy psycopg2-binary # for PostgreSQL
pip install sqlalchemy pymysql # for MySQL
from sqlalchemy import create_engine
sqlite_engine = create_engine("sqlite:///mydata.db")
postgres_engine = create_engine("postgresql://user:password@localhost:5432/dbname")
mysql_engine = create_engine("mysql+pymysql://user:password@localhost:3306/dbname")
pandas.read_sql does not care which database you use as long as you pass a valid connection. This is the function that bridges SQL queries directly into DataFrames.
The Full Setup
import sqlite3
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
engine = create_engine("sqlite:///analytics.db")
with engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS customers (
customer_id INTEGER PRIMARY KEY,
name TEXT,
city TEXT,
segment TEXT,
created_date TEXT
)
"""))
conn.execute(text("""
CREATE TABLE IF NOT EXISTS transactions (
txn_id INTEGER PRIMARY KEY,
customer_id INTEGER,
amount REAL,
category TEXT,
txn_date TEXT,
status TEXT
)
"""))
conn.commit()
customers_data = pd.DataFrame({
"customer_id": range(1, 9),
"name": ["Alex", "Priya", "Sam", "Jordan", "Lisa", "Ravi", "Tom", "Nina"],
"city": ["Mumbai", "Delhi", "Bangalore", "Mumbai", "Chennai", "Delhi", "Bangalore", "Mumbai"],
"segment": ["Premium", "Standard", "Standard", "Premium", "Standard", "Premium", "Standard", "Premium"],
"created_date": ["2022-01-10", "2021-06-15", "2023-03-01", "2020-11-22",
"2022-08-30", "2021-02-14", "2023-07-05", "2021-09-18"]
})
np.random.seed(42)
n = 50
transactions_data = pd.DataFrame({
"txn_id": range(1, n + 1),
"customer_id": np.random.randint(1, 9, n),
"amount": np.random.uniform(500, 80000, n).round(0),
"category": np.random.choice(["Electronics", "Clothing", "Food", "Travel"], n),
"txn_date": pd.date_range("2024-01-01", periods=n, freq="3D").strftime("%Y-%m-%d"),
"status": np.random.choice(["completed", "completed", "completed", "cancelled"], n)
})
customers_data.to_sql("customers", engine, if_exists="replace", index=False)
transactions_data.to_sql("transactions", engine, if_exists="replace", index=False)
print("Data loaded successfully.")
print(f"Customers: {len(customers_data)} rows")
print(f"Transactions: {len(transactions_data)} rows")
to_sql writes a DataFrame directly to a database table. if_exists="replace" drops and recreates the table. if_exists="append" adds rows to an existing table. if_exists="fail" raises an error if the table exists.
index=False prevents Pandas from writing the DataFrame index as an extra column. Almost always what you want.
Reading Data: pd.read_sql_query
df = pd.read_sql_query(
"SELECT * FROM customers",
engine
)
print(df)
Simple. Pass any SQL string and a connection or engine. Get a DataFrame back.
Parameterized queries with SQLAlchemy:
city = "Mumbai"
df = pd.read_sql_query(
text("SELECT * FROM customers WHERE city = :city"),
engine,
params={"city": city}
)
print(df)
Never use f-strings to build SQL queries with user input. This is how SQL injection attacks happen. Always use parameterized queries when values come from outside your code.
# WRONG - SQL injection risk
city = "Mumbai"
df = pd.read_sql_query(f"SELECT * FROM customers WHERE city = '{city}'", engine)
# CORRECT - parameterized
df = pd.read_sql_query(
text("SELECT * FROM customers WHERE city = :city"),
engine,
params={"city": city}
)
The Pipeline Pattern: SQL Filters, Pandas Transforms
The most common real-world pattern. Use SQL to pull only what you need. Use Pandas for everything else.
df = pd.read_sql_query("""
SELECT
c.customer_id,
c.name,
c.city,
c.segment,
COUNT(t.txn_id) AS total_transactions,
SUM(t.amount) AS total_spent,
AVG(t.amount) AS avg_transaction,
MAX(t.txn_date) AS last_transaction
FROM customers c
LEFT JOIN transactions t
ON c.customer_id = t.customer_id
AND t.status = 'completed'
GROUP BY c.customer_id, c.name, c.city, c.segment
ORDER BY total_spent DESC NULLS LAST
""", engine)
df["last_transaction"] = pd.to_datetime(df["last_transaction"])
df["days_since_last"] = (pd.Timestamp.now() - df["last_transaction"]).dt.days
df["customer_value"] = pd.cut(
df["total_spent"].fillna(0),
bins=[0, 10000, 50000, float("inf")],
labels=["Low", "Medium", "High"]
)
print(df[["name", "segment", "total_spent", "avg_transaction", "days_since_last", "customer_value"]])
Output:
name segment total_spent avg_transaction days_since_last customer_value
0 Alex Premium 238500.0 29812.5 24.0 High
1 Priya Standard 198000.0 28285.7 15.0 High
...
SQL handled the aggregation across 50 rows of transaction data before it even touched Python. Pandas then added date calculations and customer value tiers that SQL could not do as cleanly.
Writing Results Back to the Database
Analysis results that others need should live in the database, not in your laptop's memory.
summary = df[["customer_id", "total_spent", "total_transactions", "customer_value", "days_since_last"]].copy()
summary["calculated_at"] = pd.Timestamp.now().strftime("%Y-%m-%d %H:%M:%S")
summary.to_sql(
"customer_summary",
engine,
if_exists="replace",
index=False
)
verification = pd.read_sql_query("SELECT * FROM customer_summary", engine)
print(f"Summary table written: {len(verification)} rows")
print(verification.head(3))
The summary is now queryable by anyone with database access. A dashboard tool can read it. Another analyst can join it. An application can serve it to users.
Chunked Reading for Large Tables
When a table has millions of rows, loading it all into memory crashes things. Read it in chunks.
chunk_size = 1000
total_revenue = 0
row_count = 0
for chunk in pd.read_sql_query(
"SELECT amount, status FROM transactions",
engine,
chunksize=chunk_size
):
completed = chunk[chunk["status"] == "completed"]
total_revenue += completed["amount"].sum()
row_count += len(completed)
print(f"Processed {row_count} completed transactions")
print(f"Total revenue: {total_revenue:,.0f}")
Each chunk is a DataFrame of chunk_size rows. Process it, extract what you need, let it get garbage-collected. You never hold more than one chunk in memory at once.
Context Managers and Connection Safety
Database connections are resources. Always close them properly. Context managers guarantee this.
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM transactions"))
count = result.scalar()
print(f"Transaction count: {count}")
with sqlite3.connect("analytics.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM customers")
count = cursor.fetchone()[0]
print(f"Customer count: {count}")
When the with block ends, the connection closes automatically. Even if an exception is raised inside the block. Never rely on garbage collection to close database connections. Always use context managers.
Transactions: All or Nothing
When you write multiple related records, they should either all succeed or all fail together. A transaction guarantees this.
def add_customer_with_transaction(engine, customer_data, initial_transaction):
with engine.begin() as conn:
conn.execute(
text("""
INSERT INTO customers (customer_id, name, city, segment, created_date)
VALUES (:id, :name, :city, :segment, :date)
"""),
{"id": customer_data["id"], "name": customer_data["name"],
"city": customer_data["city"], "segment": customer_data["segment"],
"date": customer_data["date"]}
)
conn.execute(
text("""
INSERT INTO transactions (txn_id, customer_id, amount, category, txn_date, status)
VALUES (:txn_id, :customer_id, :amount, :category, :date, 'completed')
"""),
{"txn_id": initial_transaction["txn_id"],
"customer_id": customer_data["id"],
"amount": initial_transaction["amount"],
"category": initial_transaction["category"],
"date": initial_transaction["date"]}
)
print(f"Customer {customer_data['name']} and first transaction added successfully.")
new_customer = {"id": 9, "name": "Oscar", "city": "Pune",
"segment": "Standard", "date": "2024-03-01"}
first_txn = {"txn_id": 51, "amount": 15000, "category": "Electronics", "date": "2024-03-01"}
add_customer_with_transaction(engine, new_customer, first_txn)
engine.begin() starts a transaction. If any statement inside raises an exception, all changes are rolled back. The customer and the transaction are inserted together or not at all. No orphaned records.
A Complete Data Pipeline
Putting everything together. Extract from database, transform in Pandas, load back.
def run_etl_pipeline(engine):
print("Step 1: Extract")
raw = pd.read_sql_query("""
SELECT
t.txn_id, t.customer_id, t.amount, t.category,
t.txn_date, t.status, c.city, c.segment
FROM transactions t
JOIN customers c ON t.customer_id = c.customer_id
WHERE t.status = 'completed'
""", engine)
print(f" Extracted {len(raw)} rows")
print("Step 2: Transform")
raw["txn_date"] = pd.to_datetime(raw["txn_date"])
raw["month"] = raw["txn_date"].dt.to_period("M").astype(str)
raw["amount_usd"] = (raw["amount"] / 83).round(2)
monthly_summary = raw.groupby(["month", "category", "segment"]).agg(
transactions=("txn_id", "count"),
revenue=("amount", "sum"),
avg_value=("amount", "mean")
).reset_index()
monthly_summary["avg_value"] = monthly_summary["avg_value"].round(0)
print(f" Transformed to {len(monthly_summary)} summary rows")
print("Step 3: Load")
monthly_summary.to_sql("monthly_summary", engine, if_exists="replace", index=False)
print(" Loaded to monthly_summary table")
result = pd.read_sql_query(
"SELECT * FROM monthly_summary ORDER BY month, revenue DESC",
engine
)
print("\nFinal output:")
print(result.head(10))
run_etl_pipeline(engine)
Extract, transform, load. Every data pipeline you will ever build follows this shape. The tools change. The databases change. The pattern does not.
A Resource Worth Knowing
Robin Moffatt wrote a widely-read piece called "Kafka, JDBC and the Problem of Schema Evolution" but his more practical content lives on the Confluent blog where he writes about database connectivity patterns in Python for real production systems. His writing on SQLAlchemy connection pooling and transaction management is some of the most practical content available. Search "Robin Moffatt SQLAlchemy Python production."
For SQLAlchemy specifically, the documentation at docs.sqlalchemy.org/en/14/core/tutorial.html is unusually good. The "Core Tutorial" section reads like a well-written blog post, not a reference manual. Worth reading alongside this post.
Try This
Create python_sql_practice.py.
Build a complete mini analytics system on the database from this post.
Part 1: Segmentation pipeline. Query all completed transactions. In Pandas, calculate each customer's total spending, transaction count, average transaction value, and days since last purchase. Write the result back to a table called customer_rfm.
Part 2: Category report. Using a single SQL query (no Pandas processing), get revenue by category and month. Use a window function to add a column showing each category's percentage of total monthly revenue. Load into a DataFrame and print.
Part 3: Anomaly detection. Read the transactions table. In Pandas, calculate z-scores for the amount column. Flag any transaction more than 2 standard deviations from the mean as an anomaly. Write these flagged transactions to a table called transaction_anomalies.
Part 4: Pipeline function. Wrap all three parts into a single run_analysis(engine) function. It should print progress messages, handle exceptions with try/except, and print a final summary showing how many rows were written to each output table.
Phase 4 Complete
Five posts. From basic SELECT statements to joining three tables, subqueries, CTEs, window functions, and full Python-SQL integration.
SQL is not a separate subject from data science. It is the first tool you use in almost every analysis. The data lives in a database. SQL is how you get it out.
Phase 5 next. Three posts covering Git, GitHub, Jupyter, and Google Colab. Short phase. High leverage. These are the tools that make your work reproducible, shareable, and professional.
Top comments (0)