Why This Matters (From Real Data Platforms)
In large data platforms, empty data is often more dangerous than bad data
A daily pipeline that “runs successfully” but ingests zero rows can quietly break downstream reports, fraud models, and executive dashboards. In one real-world scenario, a missing partition in an orders fact table propagated all the way to a revenue report — the pipeline was green, but business numbers were wrong for an entire day.
This is why mature data platforms treat data availability as a first-class validation, not an afterthought.
By combining:
Database-level guarantees (stored procedures)
Application-level orchestration (Python)
Operational enforcement (Airflow)
we build pipelines that fail loudly, alert early, and recover predictably.
Modern data pipelines often start with legacy stored procedures, but as data engineering teams scale, moving SQL logic into Python + Airflow pipelines provides better orchestration, observability, and validation.
In this article, we will walk through an industry-standard pipeline for a daily customer orders snapshot (Oracle-based) using:
-
Schemas:
CO(customers, stores, products) andOE(orders, order_items) - Partitioned tables: Daily partitions for large fact tables
- Validation checks: Fail if no rows exist, send emails
- Airflow orchestration: Automated daily runs with success/failure notifications
showing how responsibilities are cleanly split across layers.
1️⃣ Sample Stored Procedure (Legacy Approach)
CREATE OR REPLACE PROCEDURE sp_daily_customer_orders_snapshot (
p_partition_date IN VARCHAR2 DEFAULT NULL
) AS
v_partition_date VARCHAR2(8);
BEGIN
v_partition_date := COALESCE(p_partition_date, TO_CHAR(TRUNC(SYSDATE)-1, 'YYYYMMDD'));
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE dw.daily_customer_orders';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN RAISE; END IF;
END;
EXECUTE IMMEDIATE '
CREATE TABLE dw.daily_customer_orders AS
SELECT /*+ PARALLEL(8) */
c.customer_id,
c.customer_name,
s.store_id,
s.store_name,
o.order_id,
o.order_date,
oi.product_id,
p.product_name,
oi.quantity,
oi.unit_price,
(oi.quantity * oi.unit_price) AS total_amount,
CASE
WHEN (oi.quantity * oi.unit_price) >= 1000 THEN ''High''
WHEN (oi.quantity * oi.unit_price) >= 500 THEN ''Medium''
ELSE ''Low''
END AS order_value_category
FROM oe.orders_fct PARTITION (P' || v_partition_date || ') o
INNER JOIN oe.order_items_fct PARTITION (P' || v_partition_date || ') oi
ON o.order_id = oi.order_id
INNER JOIN co.customers_dim c
ON o.customer_id = c.customer_id
INNER JOIN co.stores_dim s
ON o.store_id = s.store_id
INNER JOIN co.products_dim p
ON oi.product_id = p.product_id
WHERE o.order_status = ''COMPLETED''
';
COMMIT;
END sp_daily_customer_orders_snapshot;
/
Assumption: The above procedure is called by Airflow.
How it’s typically run
- Airflow (or Control-M / cron) calls the procedure
- Logs go to
DBMS_OUTPUT - If the table is empty, the procedure still “succeeds”
Limitations of this approach:
- Hard to monitor & test
- No automated alerts
- No validation if the table is empty
- Tightly coupled to Oracle
2️⃣ Refactored Python + Airflow Approach (Modern Approach)
The best pattern today is separation of concerns.
The golden rule
Oracle does heavy SQL. Python + Airflow do orchestration, validation, and alerting.
You do not rewrite SQL into pandas.
You move control, not computation.
Moving SQL to Python allows for dynamic orchestration, validation, and alerts.
So Python becomes the conductor, Oracle remains the engine.
Responsibilty split:
| Layer | Responsibility |
|---|---|
| SQL files | Transformations (CTAS, joins, analytics) |
| Python | Parameters, validation, logging |
| Airflow | Scheduling, retries, emails, observability |
| Oracle | Execution engine |
2.1 Externalize SQL (Keep it optimized)
SQL File: daily_customer_orders.sql
-- File: sql/daily_customer_orders.sql
CREATE TABLE dw.daily_customer_orders AS
SELECT /*+ PARALLEL(8) */
c.customer_id,
c.customer_name,
s.store_id,
s.store_name,
o.order_id,
o.order_date,
oi.product_id,
p.product_name,
oi.quantity,
oi.unit_price,
(oi.quantity * oi.unit_price) AS total_amount,
CASE
WHEN (oi.quantity * oi.unit_price) >= 1000 THEN 'High'
WHEN (oi.quantity * oi.unit_price) >= 500 THEN 'Medium'
ELSE 'Low'
END AS order_value_category
FROM oe.orders_fct PARTITION (P{{ partition_date }}) o
INNER JOIN oe.order_items_fct PARTITION (P{{ partition_date }}) oi
ON o.order_id = oi.order_id
INNER JOIN co.customers_dim c
ON o.customer_id = c.customer_id
INNER JOIN co.stores_dim s
ON o.store_id = s.store_id
INNER JOIN co.products_dim p
ON oi.product_id = p.product_id
WHERE o.order_status = 'COMPLETED';
Key idea:
- SQL remains pure Oracle
- Partition is injected dynamically
> Placeholder
{{ partition_date }}will be replaced dynamically in Python.
2.2 Python Execution Script
Design Principle
Oracle CTAs does not fail on empty results.
So we explicitly:
- Run CTAS
- Immediately SELECT COUNT(*)
- If count = 0:
- Raise a custom exception
- Airflow handles alerting
Once any step returns zero rows -> exception raised -> task fails.
scripts/daily_customer_orders_pipeline.py
# File: scripts/daily_customer_orders_pipeline.py
import logging
from cx_Oracle import connect
log = logging.getLogger(__name__)
class EmptyTableError(Exception):
"""Raised when the target table is empty."""
pass
def validate_rowcount(cursor, table_name: str):
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cursor.fetchone()[0]
log.info("Row count for %s: %d", table_name, count)
if count == 0:
raise EmptyTableError(f"{table_name} has NO DATA!")
def run_daily_orders(conn, partition_date: str):
cursor = conn.cursor()
# Drop existing table
try:
cursor.execute("DROP TABLE dw.daily_customer_orders PURGE")
log.info("Dropped existing table")
except Exception as e:
if "ORA-00942" in str(e):
log.info("Table does not exist")
else:
raise
# Read SQL and replace partition
sql_file = "/opt/airflow/sql/daily_customer_orders.sql"
with open(sql_file, "r") as f:
sql = f.read().replace("{{ partition_date }}", partition_date)
# Execute SQL
cursor.execute(sql)
# Validate row count
validate_rowcount(cursor, "co_stg.daily_customer_orders")
conn.commit()
log.info("Snapshot created successfully for partition %s", partition_date)
Why this matters
CTAS with zero rows is treated as a failure
Bad data never silently propagates
Validation is reusable across pipelines
2.3 Airflow DAG with success & failure emails
dags/daily_customer_orders_dag.py
# File: dags/daily_customer_orders_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.hooks.oracle_hook import OracleHook
from datetime import datetime
from scripts.daily_customer_orders_pipeline import run_daily_orders, EmptyTableError
default_args = {
"owner": "data-engineering",
"email": ["data-alerts@company.com"],
"email_on_failure": True,
"email_on_success": True,
"retries": 1,
"retry_delay": 300,
}
def run_pipeline(**context):
partition_date = context["ds_nodash"] # YYYYMMDD
hook = OracleHook(oracle_conn_id="oracle_default")
conn = hook.get_conn()
try:
run_daily_orders(conn, partition_date)
except EmptyTableError as e:
raise RuntimeError(str(e))
with DAG(
dag_id="daily_customer_orders_snapshot",
default_args=default_args,
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1
) as dag:
task_run_snapshot = PythonOperator(
task_id="run_daily_customer_orders_snapshot",
python_callable=run_pipeline,
provide_context=True
)
What happens at runtime
-
Success case
- Table created
- Row count > 0
- Task succeeds
- Success email sent
- Downstream tasks can run
-
Failure case (empty table)
- Validation fails
- Task errors intentionally
- Failure email sent
- DAG stops
- Issue visible immediately
This is data quality enforcement, not just scheduling.
3️⃣ Key Features
-
Partitioned Table Handling: Dynamic
partition_datevia Airflow execution date - Row-Count Validation: Task fails if no data → triggers email alert
- Success/Failure Notifications: Automatic emails from Airflow
- Idempotent: Drops old table before snapshot
-
Performance Optimized: Oracle
PARALLELhint for large datasets
Airflow will send something like:
Task Failed: run_customers_orders
Exception:
Step X - Customer Orders failed: Table has NO DATAExecution date: 2026-02-03
Log URL:...
This is far more actionable than DBMS_OUTPUT.
4️⃣ Pros & Cons
| Approach | Pros | Cons |
|---|---|---|
| Stored Procedure | Fast, data-local | Hard to monitor, test, alert |
| Python + Airflow | Observability, retries, email alerts, validation, partitioned pipeline | Requires setup & orchestration |
5️⃣ Target Architecture
airflow/
├── dags/
│ └── daily_customer_orders_dag.py
├── scripts/
│ └── daily_customer_orders_pipeline.py
└── sql/
├── daily_customer_orders.sql
6️⃣ Takeaways
Stored procedures are not “bad”. They are just not orchestration tools. There is no universal winner between stored procedures and SQL-in-Python — strong pipelines use both, intentionally.
- Move heavy SQL to Oracle, but orchestration, validation, and monitoring to Python + Airflow.
- Implement row-count validation and alerts to catch empty or failed datasets early.
- Modular architecture allows scaling: more tables, incremental loads, retries, and notifications.
Recommendations
You can refactor and enhance for different use cases:
Incremental Loads: Instead of full snapshot, insert only new records.
Branching DAGs: Skip downstream tasks if no data.

Top comments (0)