DEV Community

Max Klein
Max Klein

Posted on

How to Build an ETL Pipeline with Python and MySQL

In today’s data-driven world, extracting, transforming, and loading (ETL) data is a cornerstone of modern analytics and application development. Whether you’re aggregating customer data, preparing datasets for machine learning, or building a data warehouse, ETL pipelines are the backbone of every robust data infrastructure.

This tutorial will guide you through building a complete ETL pipeline using Python and MySQL—two of the most widely used tools in the data ecosystem. By the end of this article, you’ll have a working ETL pipeline that extracts data from a source (like a CSV file), transforms it using Python, and loads it into a MySQL database. We’ll cover everything from setup to implementation, with practical code examples and actionable best practices.


Prerequisites

Before diving into the tutorial, ensure your environment meets the following requirements:

Software and Tools

  • Python 3.8+ installed on your machine (check with python --version)
  • MySQL 8.0+ (you can use MySQL Community Server or a managed service like AWS RDS)
  • pip (Python’s package manager; install via get-pip.py if needed)

Python Packages

Install the following Python libraries using pip:

pip install mysql-connector-python pandas
Enter fullscreen mode Exit fullscreen mode

MySQL Setup

  1. Install and configure MySQL on your machine or cloud provider.
  2. Create a new database (e.g., etl_demo) and a user with access privileges.
  3. Note the host, username, password, and database name for later use.

Step 1: Understanding the ETL Pipeline

An ETL pipeline consists of three core stages:

Extract

Retrieve raw data from its source (e.g., CSV, API, or another database). This step focuses on data discovery and retrieval.

Transform

Clean, filter, and structure the data into a format suitable for storage or analysis. This might include type conversions, deduplication, or aggregation.

Load

Insert the transformed data into the target system—in this case, a MySQL database.


Step 2: Setting Up Your Project Structure

Create a project directory with the following files:

etl_pipeline/
│
├── data/
│   └── raw_data.csv
├── transform.py
├── load.py
└── etl_pipeline.py
Enter fullscreen mode Exit fullscreen mode
  • raw_data.csv: A sample CSV file containing raw data (we’ll create this in the next section).
  • transform.py: Handles data transformation logic.
  • load.py: Manages data loading into MySQL.
  • etl_pipeline.py: Orchestrates the ETL process.

Step 3: Extracting Data from a CSV File

For this tutorial, we’ll use a simple CSV file as the source of raw data. Here’s an example of what data/raw_data.csv might look like:

id,name,age,email
1,John Doe,30,john@example.com
2,Jane Smith,25,jane@example.com
3,Bob Johnson,35,bob@example.com
Enter fullscreen mode Exit fullscreen mode

Code Example: Extracting Data

In transform.py, start by reading the CSV file with Pandas:

import pandas as pd

def extract_data(file_path):
    try:
        # Read CSV into a DataFrame
        df = pd.read_csv(file_path)
        print("✅ Data extracted successfully.")
        return df
    except FileNotFoundError:
        print("❌ Error: File not found.")
        return None
    except Exception as e:
        print(f"❌ Error during extraction: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Tip: Always handle exceptions during extraction to avoid pipeline failures.


Step 4: Transforming the Data

Once the data is extracted, we need to clean and prepare it. Common transformations include:

  • Removing duplicates
  • Validating data types
  • Filtering out invalid records
  • Standardizing formats (e.g., email validation)

Code Example: Transforming Data

Update transform.py with the following transformation logic:

import pandas as pd
import re

def transform_data(df):
    if df is None:
        return None

    try:
        # Remove duplicate entries
        df.drop_duplicates(subset=['email'], inplace=True)

        # Validate and clean email addresses
        def is_valid_email(email):
            return re.match(r"[^@]+@[^@]+\.[^@]+", email) is not None

        df = df[df['email'].apply(is_valid_email)]

        # Convert age to integer, filter out invalid values
        df['age'] = pd.to_numeric(df['age'], errors='coerce')
        df = df[df['age'].between(18, 100)]

        print("✅ Data transformed successfully.")
        return df
    except Exception as e:
        print(f"❌ Error during transformation: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Warning: Avoid using df.dropna() without understanding the impact on your dataset. Always validate data carefully.


Step 5: Loading Data into MySQL

Now that the data is clean, it’s time to load it into a MySQL database. We’ll use the mysql-connector-python library to handle this.

Step 5.1: Configuring MySQL Connection

Create a config.py file to store your database credentials:

# config.py
DB_CONFIG = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'etl_demo'
}
Enter fullscreen mode Exit fullscreen mode

Step 5.2: Inserting Data into MySQL

In load.py, write a function to insert the transformed DataFrame into a MySQL table:

import mysql.connector
from mysql.connector import Error
import config

def load_data(df, table_name):
    if df is None or df.empty:
        print("❌ No data to load.")
        return

    try:
        # Establish database connection
        connection = mysql.connector.connect(**config.DB_CONFIG)
        cursor = connection.cursor()

        # Create table if it doesn't exist
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            id INT PRIMARY KEY,
            name VARCHAR(255),
            age INT,
            email VARCHAR(255)
        )
        """
        cursor.execute(create_table_query)

        # Insert data
        for _, row in df.iterrows():
            insert_query = f"""
            INSERT INTO {table_name} (id, name, age, email)
            VALUES (%s, %s, %s, %s)
            """
            cursor.execute(insert_query, tuple(row))

        # Commit transaction
        connection.commit()
        print("✅ Data loaded successfully into MySQL.")

    except Error as e:
        print(f"❌ MySQL error: {e}")
    except Exception as e:
        print(f"❌ Error during loading: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("🔒 MySQL connection closed.")
Enter fullscreen mode Exit fullscreen mode

Best Practice: Use parameterized queries (like %s) to prevent SQL injection attacks.


Step 6: Orchestrating the ETL Pipeline

Now that all components are in place, create an etl_pipeline.py file to tie them together:

from transform import extract_data, transform_data
from load import load_data

def main():
    # Configuration
    CSV_FILE_PATH = 'data/raw_data.csv'
    TABLE_NAME = 'users'

    # Extract
    df = extract_data(CSV_FILE_PATH)
    if df is None:
        return

    # Transform
    df = transform_data(df)
    if df is None:
        return

    # Load
    load_data(df, TABLE_NAME)

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Tip: For production systems, consider using a task scheduler (e.g., cron, Airflow) to automate pipeline execution.


Step 7: Testing the Pipeline

Run the pipeline by executing:

python etl_pipeline.py
Enter fullscreen mode Exit fullscreen mode

If everything works, you should see the following output in your MySQL database:

SELECT * FROM users;
Enter fullscreen mode Exit fullscreen mode
id name age email
1 John Doe 30 john@example.com
2 Jane Smith 25 jane@example.com
3 Bob Johnson 35 bob@example.com

Warning: Always verify the data in MySQL after loading to catch any discrepancies.


Best Practices for ETL Pipelines

Here are some essential tips to keep your ETL pipeline robust and scalable:

1. Use Transactions

Wrap multiple inserts in a transaction to ensure atomicity (i.e., all-or-nothing operations).

2. Handle Large Datasets

For large files, use chunking with chunksize in pd.read_csv() to avoid memory overload.

3. Log Progress and Errors

Implement logging to track pipeline execution and troubleshoot issues quickly.

4. Parameterize Configuration

Store database credentials and file paths in external configuration files instead of hardcoding them.

5. Validate Data Early

Perform data validation during the extraction phase to reduce errors later in the pipeline.


Conclusion

You’ve now built a complete ETL pipeline using Python and MySQL! This pipeline can be extended to handle more complex use cases, such as:

  • Extracting data from APIs or cloud storage services
  • Transforming data using SQL or advanced Python libraries (e.g., NumPy, Dask)
  • Loading data into data warehouses or big data platforms (e.g., Snowflake, BigQuery)

This tutorial serves as a foundation for your ETL journey. As you gain experience, you’ll likely want to explore more advanced tools and frameworks.


Next Steps

Here’s how to take your ETL skills further:

1. Automate with Scheduling Tools

Use Apache Airflow or Celery to schedule and monitor your ETL pipelines.

2. Use Connection Pools

For high-throughput systems, replace mysql-connector-python with SQLAlchemy or PyMySQL to leverage connection pooling.

3. Implement Data Quality Checks

Integrate tools like Great Expectations or Deeplake to enforce data quality rules.

4. Explore Cloud Solutions

Migrate your pipeline to cloud platforms like AWS Glue or Google Dataflow for scalability.

5. Build a Dashboard

Visualize your data using Tableau, Power BI, or Dash to derive actionable insights.


By mastering ETL pipelines, you’ll become a more versatile developer capable of handling complex data workflows. Now go build something amazing!


Need help building data pipelines or extracting web data? N3X1S INTELLIGENCE delivers production-ready scraping and ETL solutions.

Top comments (0)