DEV Community

Cover image for Building an End-to-End Data Engineering Pipeline with DuckDB and Python
Satyam Gupta
Satyam Gupta

Posted on

Building an End-to-End Data Engineering Pipeline with DuckDB and Python

Tldr: An end‑to‑end data engineering + analytics walkthrough that takes a public dataset from raw → cleaned → star schema (fact + dims) → data quality checks → business marts → charts using a single Jupyter notebook.

This article is also available on Medium and LinkedIn, but here I’ve included more code.

Data engineering isn’t just about moving data around — it’s about building pipelines that make data usable. In this tutorial, I’ll walk through how I turned a raw dataset into BI-ready marts and visualizations, all inside a single Jupyter Notebook.

We’ll follow the Medallion Architecture pattern:

  • Bronze → Raw data
  • Silver → Cleaned and standardized
  • Gold → Star schema (fact + dimensions)
  • QA → Quality checks
  • Marts → Business-friendly aggregates
  • Visualization → Final charts

👉 Full repo: https://github.com/Satyam-gupta20/data-engineering-endToend

Ingestion (Bronze Layer)

At this stage, everything is “as-is” — messy but captured.

train_ds = load_dataset('Hacker0x01/disclosed_reports', split = "train").to_pandas()
test_ds = load_dataset('Hacker0x01/disclosed_reports', split = 'test').to_pandas()
validate_ds = load_dataset('Hacker0x01/disclosed_reports', split = 'validation').to_pandas()

df = pd.concat([train_ds, test_ds, validate_ds], ignore_index = True)

# for any null dict fields, add {} to them instead of them being blank
for c in ['reporter', 'team', 'weakness', 'structured_scope']:
  df[c] = df[c].apply(lambda x : x if isinstance(x, dict) else {})

  df[c + "_json"] = df[c].apply(
      lambda d : json.dumps(
          {k : (v.tolist() if isinstance(v, np.ndarray) else v) for k,v in d.items()},
          sort_keys = True,
          ensure_ascii =  False
      )
  )

#convert date fields to datetime
df['created_at'] = pd.to_datetime(df.get("created_at"), errors = "coerce")
df['disclosed_at'] = pd.to_datetime(df.get("disclosed_at"), errors = "coerce")

bronze_cols = [ "id", "title", "created_at", "disclosed_at", "substate", "visibility", "has_bounty?", "vote_count",
               "original_report_id", "reporter_json", "team_json", "weakness_json", "structured_scope_json", "vulnerability_information" ]

bronze = df[[c for c in bronze_cols if c in df.columns]]
bronze.to_parquet("bronze_hackerone_reports.parquet", index = False)
bronze.to_csv("raw_data.csv")
Enter fullscreen mode Exit fullscreen mode

Transformation (Silver Layer)

Silver = cleaned, consistent, ready for modeling.

# create staging table with clean/standardized scalar columns
con.sql("""
    CREATE OR REPLACE TABLE stg_reports AS
    SELECT
    CAST(id as BIGINT) as report_id,
    title,
    LOWER(NULLIF(substate,'')) as substate, --normalised casing
    visibility,
    "has_bounty?" as has_bounty,
    CAST(vote_count AS INTEGER) as vote_count,
    CAST(created_at AS TIMESTAMP) as created_at,
    CAST(disclosed_at AS TIMESTAMP) as disclosed_at,
    CAST(original_report_id AS BIGINT) as original_report_id,
    reporter_json,
    weakness_json,
    team_json,
    structured_scope_json,
    vulnerability_information
    FROM bronze;
""")

#stage normalised tables ie, flatten JSON into typed columns
con.sql("""
    CREATE OR REPLACE TABLE stg_reporter AS
    SELECT DISTINCT
    reporter_json,
    json_extract_string(reporter_json, '$.username') as username,
    CAST(json_extract(reporter_json, '$.verified') AS BOOLEAN) as verified
    from bronze;
""")

con.sql("""
    CREATE OR REPLACE TABLE stg_team AS
    SELECT DISTINCT
    team_json,
    json_extract_string(team_json, '$.handle') as handle,
    CAST(json_extract(team_json, '$.id') AS BIGINT) as id,
    CAST(json_extract(team_json,'$.offers_bounties') AS BOOLEAN) AS offers_bounties
    from bronze;
""")

con.sql("""
    CREATE OR REPLACE TABLE stg_weakness AS
    SELECT DISTINCT
    weakness_json,
    json_extract_string(weakness_json,'$.name') AS weakness_name,
    CAST(json_extract(weakness_json, '$.id') AS BIGINT) as id,
    FROM bronze;
""")

con.sql("""
CREATE OR REPLACE TABLE stg_asset AS
SELECT DISTINCT
  structured_scope_json,
  json_extract_string(structured_scope_json,'$.asset_identifier') AS asset_identifier,
  json_extract_string(structured_scope_json,'$.asset_type')       AS asset_type,
  json_extract_string(structured_scope_json,'$.max_severity')     AS max_severity
FROM bronze;
""")
Enter fullscreen mode Exit fullscreen mode

Star Schema (Gold Layer)

We separate fact and dimension tables:

Goal: Build a Source of Truth model (star schema) with conformed dimensions and a single fact table.

Steps:

  1. Generate Surrogate Keys:
  2. Apply hash(JSON) on each entity’s raw JSON to generate a stable, privacy-safe surrogate key (reporter_id, team_id, weakness_id, asset_id).
  3. Why: Maintains join consistency across all downstream systems while protecting sensitive identifiers (handles, usernames, etc.).

  4. Dimension Tables:

  5. dim_reporter, dim_team, dim_weakness, dim_structured_scope — one row per unique entity, keyed by surrogate ID.

  6. Conformed (consistent) across all marts and use cases.

  7. Fact Table:

  8. fact_report — one row per report, with:

  9. Natural key (report_id)

  10. Foreign keys to all four dimensions

  11. Core measures and attributes (has_bounty, vote_count, created_at, disclosed_at, substate)

Why Star Schema:

  • Easy to query for BI tools (Looker/Tableau/Power BI).

  • Clear separation of measures (facts) and descriptive attributes (dims).

  • Facilitates incremental loads and SCD handling in production.

How we do this in production:

  • Generate surrogate keys once in a controlled transformation job to guarantee stability.
  • Enforce PK/FK relationships via schema constraints or dbt tests.
  • Store Gold layer in a governed warehouse (Snowflake/BigQuery/Redshift) with strict access control, making it the single source of truth for all analytics & AI workloads.

AI tie-in: Clean, normalized entity attributes make it easier to build safe, PII-free ML feature sets downstream.

con.sql("""
    CREATE OR REPLACE TABLE dim_reporter as
    SELECT DISTINCT
    hash(reporter_json) as reporter_id, -- surrogate_key
    username,
    verified
    FROM stg_reporter;
""")

con.sql("""
    CREATE OR REPLACE TABLE dim_team as
    SELECT DISTINCT
    hash(team_json) as team_id, -- surrogate_key
    id,
    handle,
    offers_bounties
    FROM stg_team;
""")

con.sql("""
    CREATE OR REPLACE TABLE dim_weakness as
    SELECT DISTINCT
    hash(weakness_json) as weakness_id, -- surrogate_key
    id,
    weakness_name
    FROM stg_weakness;
""")

con.sql("""
    CREATE OR REPLACE TABLE dim_structured_scope as
    SELECT DISTINCT
    hash(structured_scope_json) as asset_id, -- surrogate_key
    asset_identifier,
    asset_type,
    max_severity
    FROM stg_asset;
""")
Enter fullscreen mode Exit fullscreen mode

QA Layer

Goal: Validate that Gold layer tables meet data integrity, completeness, and consistency standards before they are exposed to BI tools or AI models.

Steps:

  1. Record Counts:
  2. Ensure no unexpected row loss or duplication between Bronze → Silver → Gold.
  3. Example: COUNT(DISTINCT report_id) in fact_report should match original dataset count (minus intentional filters).

  4. Key Integrity Checks:

  5. All fact_report foreign keys (reporter_id, team_id, weakness_id, asset_id) must exist in their respective dimension tables.

  6. Null and Data Type Checks:

  7. Confirm mandatory fields (e.g., created_at, substate) are non-null.

  8. Verify correct data types for dates, booleans, and integers.

  9. Referential Consistency:

  10. No orphaned dimension entries (dims without a matching fact) unless intentional for slowly changing dimensions.

Why it matters:

  • Guarantees trust in the Source of Truth.
  • Prevents BI dashboards or ML pipelines from producing misleading insights.

How we do this in production:

  • Use dbt tests (unique, not_null, relationships) or Great Expectations to automate QA.
  • Set up CI/CD checks so broken data never reaches production.
  • Implement data quality alerts (Slack/Email) when thresholds fail.

Phase 5 – Aggregation & Marts (Analytics Layer)

Goal: Create pre-aggregated, business-friendly datasets optimized for consumption by BI tools, APIs, and AI feature stores.

Why Marts Matter:

  • Simplify queries for analysts and business users.
  • Improve dashboard performance by avoiding heavy aggregations at runtime.
  • Provide feature-ready datasets for AI/ML models.

How we do this in production:

  • Create materialized views or incremental tables in the warehouse.
  • Store in BI schema separate from operational schemas.
  • Automate refreshes using orchestration tools (Airflow/Prefect) on a schedule or event trigger.
  • For AI, register these marts in a feature store (Feast/Tecton) so ML teams can use them without re-engineering features.

Key Takeaways

  • DuckDB is a game-changer for local SQL + analytics.
  • The Medallion approach keeps data modeling organized.
  • Star schemas still matter — they power BI/analytics-friendly datasets.

👉 Full notebook: GitHub – data-engineering-end-to-end

Would love feedback: What tools do you use for building marts — DuckDB, dbt, or something else?

Top comments (0)