You need to orchestrate a data pipeline — extract from API, transform, load into warehouse, send Slack notification. Airflow requires a DAG file, a scheduler, a web server, a metadata database, and Kubernetes. Prefect lets you add two decorators to your existing Python functions and you're done.
What Prefect Actually Does
Prefect is a Python workflow orchestration framework. You decorate your existing Python functions with @flow and @task, and Prefect handles scheduling, retries, logging, dependency management, caching, and observability. No DAGs to define. No configuration files. Your Python code IS the workflow.
Prefect 2/3 replaced Airflow's declarative DAG model with an imperative Python-first approach. You write normal Python — if/else, loops, try/except — and Prefect tracks execution, handles failures, and provides a beautiful dashboard.
Self-hosted (free, open-source Apache 2.0) or Prefect Cloud (free tier: 5,000 task runs/month with unlimited users). The Cloud dashboard is the same one enterprises use.
Quick Start
pip install prefect
Turn any Python script into an orchestrated workflow:
from prefect import flow, task
@task(retries=3, retry_delay_seconds=60)
def extract_data(url: str) -> dict:
import requests
response = requests.get(url)
response.raise_for_status()
return response.json()
@task
def transform_data(raw: dict) -> list:
return [{
'name': item['name'],
'value': item['price'] * item['quantity']
} for item in raw['items']]
@task
def load_data(records: list) -> int:
import sqlalchemy
engine = sqlalchemy.create_engine('postgresql://...')
with engine.connect() as conn:
for r in records:
conn.execute(sqlalchemy.text(
'INSERT INTO sales (name, value) VALUES (:name, :value)'
), r)
return len(records)
@flow(name='daily-etl')
def etl_pipeline():
raw = extract_data('https://api.example.com/sales')
clean = transform_data(raw)
count = load_data(clean)
print(f'Loaded {count} records')
# Run locally
etl_pipeline()
Schedule it:
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
Deployment.build_from_flow(
flow=etl_pipeline,
name='daily-sales-etl',
schedule=CronSchedule(cron='0 6 * * *') # 6am daily
)
3 Practical Use Cases
1. Conditional Branching
@flow
def process_order(order: dict):
validated = validate_order(order)
if validated['total'] > 1000:
approval = get_manager_approval(order)
if not approval:
send_rejection_email(order)
return
payment = charge_payment(order)
if payment['status'] == 'failed':
retry_with_backup_processor(order)
return
ship_order(order)
send_confirmation(order)
Normal Python control flow. Prefect tracks every branch.
2. Parallel Task Execution
from prefect import flow, task
from prefect.futures import wait
@task
def process_file(path: str) -> dict:
# Heavy processing
return {'path': path, 'rows': count_rows(path)}
@flow
def batch_process(files: list[str]):
# Submit all tasks in parallel
futures = [process_file.submit(f) for f in files]
results = [f.result() for f in futures]
total = sum(r['rows'] for r in results)
print(f'Processed {total} total rows from {len(files)} files')
3. Caching Expensive Operations
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def expensive_api_call(query: str) -> dict:
# Cached for 1 hour — same input returns cached result
return requests.get(f'https://expensive-api.com/search?q={query}').json()
Why This Matters
Prefect makes workflow orchestration accessible to any Python developer. No Airflow infrastructure to maintain, no DAG files to manage, no learning curve beyond two decorators. For data engineers, ML engineers, and backend developers who need reliable scheduled workflows, Prefect is the modern answer.
Need custom data extraction or web scraping solutions? I build production-grade scrapers and data pipelines. Check out my Apify actors or email me at spinov001@gmail.com for custom projects.
Follow me for more free API discoveries every week!
Top comments (0)