๐ What is Workflow Orchestration?
Workflow orchestration is the process of coordinating, scheduling, and integrating various tasks in a data pipeline. For ML pipelines, this includes:
- Data ingestion and preprocessing
- Feature engineering
- Model training and evaluation
- Model deployment
- Retraining schedules
- Monitoring
Orchestration tools help manage complex dependencies, handle failures gracefully, and provide visibility into pipeline execution.
๐งฉ Prefect: A Modern Orchestration Tool for ML
Prefect is designed specifically for data-intensive applications with a Python-first approach. Unlike older orchestration tools, Prefect allows you to convert your existing Python code into production-ready data pipelines with minimal changes.
Why Prefect for MLOps?
- Pythonic: Works with your existing Python code
- Dynamic: DAGs can be created at runtime
- Resilient: Built-in error handling and recovery
- Observable: Comprehensive UI and monitoring
- Distributed: Can scale across multiple machines
- Modern: Actively maintained and feature-rich
๐๏ธ Prefect Architecture Explained
Prefect has a distributed architecture with distinct components:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โ โ โ โ โ
โ Prefect API โโโโโโโค Prefect UI โ โ Storage โ
โ (Orchestrator) โ โ (Dashboard) โ โ (S3, GCS, etc) โ
โ โ โ โ โ โ
โโโโโโโโโฌโโโโโโโโโโ โโโโโโโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ
โ โ
โ โ
โโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Prefect Agents โโโโโโโโโ
โ (Workers that execute flows) โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Components:
-
Prefect API (Orion): The orchestration engine that:
- Schedules and triggers flows
- Tracks flow and task states
- Stores execution history
- Manages concurrency and dependencies
-
Prefect UI: Web-based dashboard that provides:
- Visual representation of flows and tasks
- Execution history and logs
- Performance metrics
- Administrative controls
-
Storage: Where flow code and artifacts are stored:
- Local filesystem
- Cloud object storage (S3, GCS, etc.)
- Git repositories
- Docker images
-
Agents: Workers that:
- Poll for scheduled flows
- Pull flow code from storage
- Execute flows in the specified environment
- Report results back to the API
๐ง Prefect Core Concepts in Detail
1. Tasks ๐
Tasks are the atomic units of work in Prefect. They encapsulate individual operations and can be composed into larger workflows.
Task Definition:
from prefect import task
@task(
name="extract_data", # Custom name for the task
retries=3, # Retry automatically on failure
retry_delay_seconds=30, # Wait between retries
cache_key_fn=lambda context, **params: params["url"], # Cache by URL parameter
cache_expiration=timedelta(hours=12) # Cache for 12 hours
)
def extract_data(url: str) -> pd.DataFrame:
"""Extract data from a given URL"""
return pd.read_csv(url)
Task Properties:
- Name: Human-readable identifier for the task
- Retries: Number of times to retry on failure
- Timeout: Maximum execution time
- Tags: Metadata for filtering and organization
- Cache: Store and reuse task results
- Result Storage: Where to store task outputs
- Result Handlers: How to serialize/deserialize outputs
Task States:
Tasks can be in various states during execution:
- Pending: Ready to run
- Running: Currently executing
- Completed: Successfully finished
- Failed: Encountered an error
- Retrying: Failed but will retry
- Cancelled: Manually stopped
2. Flows ๐
Flows are the main unit of work in Prefect. They coordinate the execution of tasks and manage dependencies.
Flow Definition:
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(
name="Data Processing Pipeline",
description="Extracts, transforms, and loads data",
version="1.0.0",
task_runner=ConcurrentTaskRunner(), # Run tasks concurrently
retries=2, # Retry the entire flow if it fails
)
def process_data(date: str = None):
"""Main flow to process data"""
# If no date provided, use today's date
if date is None:
date = datetime.today().strftime("%Y-%m-%d")
# Call tasks, which automatically creates dependencies
data = extract_data(f"data/data-{date}.csv")
processed = transform_data(data)
load_data(processed)
return processed
Flow Features:
- Task Dependencies: Automatically determined from function calls
- Error Handling: Custom exception handling
- Subflows: Nested flows for better organization
- Parameters: Runtime configuration
- State Handlers: Custom logic on state changes
- Logging: Structured logging for monitoring
- Concurrency: Parallel task execution
Flow Execution:
Flows can be executed in various ways:
- Directly calling the function
- Using
flow.serve()
for real-time API - Through deployments for scheduled runs
- Via the Prefect UI or API
3. Task Runners ๐โโ๏ธ
Task runners determine how tasks within a flow are executed.
Types of Task Runners:
from prefect.task_runners import SequentialTaskRunner # Default, run tasks in sequence
from prefect.task_runners import ConcurrentTaskRunner # Run tasks concurrently
from prefect.task_runners import DaskTaskRunner # Distributed execution with Dask
Use Cases:
- SequentialTaskRunner: Simple workflows, easy debugging
- ConcurrentTaskRunner: Independent tasks that can run in parallel
- DaskTaskRunner: Large-scale data processing, distributed computing
4. Deployments ๐ข
Deployments make flows executable outside your local Python environment.
Creating a Deployment:
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import CronSchedule
from prefect.infrastructure.process import Process
# Create deployment from a flow
deployment = Deployment.build_from_flow(
flow=process_data,
name="daily-data-processing",
version="1",
schedule=CronSchedule("0 0 * * *"), # Daily at midnight
infrastructure=Process(), # Run as a local process
tags=["production", "data-pipeline"],
parameters={"date": None}, # Default parameters
)
# Apply deployment to the Prefect API
deployment.apply()
Deployment Properties:
- Name: Identifier for the deployment
- Schedule: When to execute the flow
- Infrastructure: Where to run the flow
- Storage: Where to store the flow code
- Parameters: Default parameters for the flow
- Tags: For organizing and filtering deployments
5. Schedules โฐ
Schedules determine when flows should be executed automatically.
Types of Schedules:
from prefect.orion.schemas.schedules import (
CronSchedule, # Based on cron expressions
IntervalSchedule, # Run at regular intervals
RRuleSchedule # Based on iCalendar recurrence rules
)
# Examples
cron_schedule = CronSchedule(cron="0 9 * * 1-5") # Weekdays at 9 AM
interval_schedule = IntervalSchedule(interval=timedelta(hours=4)) # Every 4 hours
6. Results and Persistence ๐พ
Prefect can store and track results from task and flow runs.
Persisting Results:
@task(
persist_result=True, # Store the result
result_serializer=JSONSerializer(), # How to serialize the result
result_storage=S3Bucket.load("my-bucket"), # Where to store the result
)
def calculate_metrics(data):
# Process data and return metrics
return {"accuracy": 0.95, "precision": 0.92}
๐ ๏ธ Building an ML Pipeline with Prefect: Complete Example
The following example demonstrates a complete ML pipeline for NYC taxi trip duration prediction:
import pandas as pd
import pickle
from datetime import datetime, timedelta
from sklearn.feature_extraction import DictVectorizer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner
from prefect.deployments import Deployment
from prefect.orion.schemas.schedules import CronSchedule
from prefect.logging import get_run_logger
# Data ingestion task with retries
@task(retries=3, retry_delay_seconds=5, name="download_taxi_data")
def get_data(date):
"""Download taxi data for given date range"""
logger = get_run_logger()
train_date = datetime.strptime(date, '%Y-%m-%d')
val_date = train_date + timedelta(days=28)
train_month = train_date.month
val_month = val_date.month
train_year = train_date.year
val_year = val_date.year
train_file = f"yellow_tripdata_{train_year}-{train_month:02d}.parquet"
val_file = f"yellow_tripdata_{val_year}-{val_month:02d}.parquet"
logger.info(f"Downloading training data: {train_file}")
train_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{train_file}"
train_data = pd.read_parquet(train_url)
logger.info(f"Downloading validation data: {val_file}")
val_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{val_file}"
val_data = pd.read_parquet(val_url)
return train_data, val_data
# Data preprocessing task
@task(name="prepare_taxi_features")
def prepare_features(df, categorical_features):
"""Prepare the features for training"""
logger = get_run_logger()
# Calculate trip duration
df['duration'] = (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.total_seconds() / 60
# Filter outliers
logger.info(f"Initial shape: {df.shape}")
df = df[(df.duration >= 1) & (df.duration <= 60)]
logger.info(f"Filtered shape: {df.shape}")
# Create features
df['PU_DO'] = df['PULocationID'].astype(str) + '_' + df['DOLocationID'].astype(str)
# Process categoricals
df[categorical_features] = df[categorical_features].fillna(-1).astype('str')
return df
# Model training task
@task(name="train_regression_model")
def train_model(df, categorical_features):
"""Train the model with the given data"""
logger = get_run_logger()
dicts = df[categorical_features].to_dict(orient='records')
dv = DictVectorizer()
X_train = dv.fit_transform(dicts)
y_train = df['duration'].values
logger.info(f"Training on {X_train.shape[0]} examples")
lr = LinearRegression()
lr.fit(X_train, y_train)
return lr, dv
# Model evaluation task
@task(name="evaluate_model_performance")
def evaluate_model(model, dv, df, categorical_features):
"""Evaluate the model performance"""
logger = get_run_logger()
dicts = df[categorical_features].to_dict(orient='records')
X_val = dv.transform(dicts)
y_val = df['duration'].values
y_pred = model.predict(X_val)
rmse = mean_squared_error(y_val, y_pred, squared=False)
logger.info(f"RMSE: {rmse:.3f}")
return rmse
# Model artifacts storage task
@task(name="save_model_artifacts")
def save_model(model, dv, date):
"""Save the model and DictVectorizer"""
logger = get_run_logger()
# Create directory if it doesn't exist
os.makedirs("models", exist_ok=True)
with open(f'models/model-{date}.pkl', 'wb') as f:
pickle.dump(model, f)
with open(f'models/dv-{date}.pkl', 'wb') as f:
pickle.dump(dv, f)
logger.info(f"Model saved: models/model-{date}.pkl")
return True
# Main flow that orchestrates all tasks
@flow(name="taxi-duration-prediction", task_runner=SequentialTaskRunner())
def main(date=None):
"""Main flow for taxi duration prediction model training"""
if date is None:
date = datetime.today().strftime('%Y-%m-%d')
logger = get_run_logger()
logger.info(f"Starting training pipeline for date: {date}")
categorical_features = ['PULocationID', 'DOLocationID', 'PU_DO']
# 1. Get data
train_data, val_data = get_data(date)
# 2. Process training data
df_train = prepare_features(df=train_data, categorical_features=categorical_features)
# 3. Train model
model, dv = train_model(df=df_train, categorical_features=categorical_features)
# 4. Process validation data
df_val = prepare_features(df=val_data, categorical_features=categorical_features)
# 5. Evaluate model
rmse = evaluate_model(
model=model,
dv=dv,
df=df_val,
categorical_features=categorical_features
)
# 6. Save model
save_model(model=model, dv=dv, date=date)
logger.info(f"Pipeline completed successfully!")
return rmse
# Define deployment for production
def create_deployment():
return Deployment.build_from_flow(
flow=main,
name="nyc-taxi-monthly-training",
schedule=CronSchedule("0 0 1 * *"), # First day of each month at midnight
tags=["production", "mlops", "taxi-duration"]
)
# For local development/testing
if __name__ == "__main__":
main("2023-01-01")
๐ MLOps Workflow Integration
In the MLOps lifecycle, Prefect serves as the workflow orchestration layer that connects:
- Data Engineering: Extracting and preparing data for ML
- Experimentation: Running and tracking model experiments
- Continuous Training: Automating regular model retraining
- Deployment: Pushing models to production
- Monitoring: Tracking model performance over time
Integration with MLflow
Prefect works well with MLflow for experiment tracking:
@task
def train_with_mlflow(X_train, y_train, params):
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)
# Train model
model = RandomForestRegressor(**params)
model.fit(X_train, y_train)
# Log metrics
train_rmse = mean_squared_error(y_train, model.predict(X_train), squared=False)
mlflow.log_metric("train_rmse", train_rmse)
# Log model
mlflow.sklearn.log_model(model, "model")
return model
๐ฆ Prefect vs. Other Orchestration Tools
Prefect vs. Airflow
Feature | Prefect | Airflow |
---|---|---|
Programming Model | Python-native, tasks as functions | DAGs as configuration |
Dynamic Workflows | Yes, DAGs can change at runtime | Limited, mostly static DAGs |
Error Handling | Rich, automated retries and custom handlers | Basic retry policies |
UI/UX | Modern, task-oriented | Traditional, DAG-oriented |
Parametrization | First-class support | More complex implementation |
Learning Curve | Lower for Python developers | Steeper |
Community | Growing | Large, established |
Prefect vs. Kubeflow
Feature | Prefect | Kubeflow |
---|---|---|
Focus | General workflow orchestration | Kubernetes-native ML pipelines |
Deployment | Multiple options (local, cloud, k8s) | Primarily Kubernetes |
Integration | Python ecosystem | Container-based components |
Complexity | Lower barrier to entry | Higher complexity |
ML Specific | General purpose, adaptable to ML | Built specifically for ML |
๐ซ Getting Started with Prefect in 5 Minutes
Installation
pip install prefect
Start the Prefect Server
prefect server start
Create a Simple Flow
from prefect import task, flow
@task
def say_hello(name):
return f"Hello, {name}!"
@flow
def hello_flow(name="world"):
result = say_hello(name)
print(result)
return result
if __name__ == "__main__":
hello_flow("MLOps Zoomcamp")
Create a Deployment
prefect deployment build hello_flow.py:hello_flow -n my-first-deployment -q default
prefect deployment apply hello_flow-deployment.yaml
Start an Agent
prefect agent start -q default
๐งช Best Practices for ML Workflows with Prefect
- Task Granularity: Create tasks at the right level - not too fine-grained, not too coarse
- Error Boundaries: Place tasks boundaries around operations that might fail
- Parameterize Flows: Make flows configurable with parameters
- Logging: Use the Prefect logger to capture important information
- Resource Management: Clean up resources in task teardown
- Caching Strategy: Cache expensive computations but be mindful of data changes
- Testing: Test flows and tasks independently
- Version Control: Track flow code in version control
- Documentation: Document flow purpose, inputs, and outputs
- Monitoring: Set up notifications for critical flow failures
๐ฎ Advanced Prefect Features for ML
Dask Integration for Distributed Training
from prefect.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
def distributed_training_flow():
# Tasks will be executed in a distributed Dask cluster
results = []
for i in range(10):
results.append(train_model_fold(fold_id=i))
return results
Storage Options for Large Models
from prefect.filesystems import S3
# Register S3 block
s3_block = S3(bucket_path="my-model-registry")
s3_block.save("model-storage")
# Use in deployment
deployment = Deployment.build_from_flow(
flow=train_flow,
name="distributed-training",
storage=S3.load("model-storage"),
)
Notifications for Critical Failures
from prefect.notifications import SlackWebhook
slack_webhook = SlackWebhook(url="https://hooks.slack.com/services/XXX/YYY/ZZZ")
slack_webhook.save("ml-alerts")
@flow(on_failure=[SlackWebhook.load("ml-alerts")])
def critical_training_flow():
# This flow will send a Slack message if it fails
...
Top comments (0)