In this tutorial, you'll learn how to build a complete data pipeline using Dotflow — a lightweight Python library that requires zero infrastructure.
No Redis. No RabbitMQ. No Postgres. No Docker. Just pip install dotflow.
What we'll build
A pipeline that:
- Extracts user data from a source
- Transforms it by filtering active users and calculating stats
- Loads the results into storage
Along the way, we'll add retry with backoff, parallel execution, checkpoint/resume, and cron scheduling.
Step 1 — Install Dotflow
pip install dotflow
Step 2 — Create your first pipeline
Create a file called pipeline.py:
from dotflow import DotFlow, action
@action
def extract():
"""Simulate extracting data from a database or API."""
return {
"users": [
{"name": "Alice", "age": 30, "active": True},
{"name": "Bob", "age": 25, "active": False},
{"name": "Charlie", "age": 35, "active": True},
{"name": "Diana", "age": 28, "active": True},
]
}
@action
def transform(previous_context):
"""Filter active users and calculate stats."""
users = previous_context.storage["users"]
active = [u for u in users if u["active"]]
avg_age = sum(u["age"] for u in active) / len(active)
return {
"active_users": active,
"total": len(active),
"average_age": round(avg_age, 1),
}
@action
def load(previous_context):
"""Print the final results."""
data = previous_context.storage
print(f"Active users: {data['total']}")
print(f"Average age: {data['average_age']}")
for user in data["active_users"]:
print(f" - {user['name']} ({user['age']})")
return data
workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
Run it:
python pipeline.py
Output:
Active users: 3
Average age: 31.0
- Alice (30)
- Charlie (35)
- Diana (28)
That's a working pipeline in 40 lines. Let's make it production-ready.
Step 3 — Add retry and timeout
Real data sources fail. Let's make the extract step resilient:
import requests
@action(retry=3, timeout=10, backoff=True)
def extract():
"""Fetch users from an API with automatic retry."""
response = requests.get("https://api.example.com/users")
response.raise_for_status()
return {"users": response.json()}
What this does:
-
retry=3— tries up to 3 times before failing -
timeout=10— each attempt has a 10-second limit -
backoff=True— waits 1s, then 2s, then 4s between retries
If all retries fail, the task is marked as FAILED with full error tracking:
for task in workflow.result_task():
if task.errors:
for error in task.errors:
print(f"Attempt {error.attempt}: {error.message}")
Step 4 — Run tasks in parallel
If your tasks are independent, run them simultaneously:
@action(retry=3, timeout=10, backoff=True)
def fetch_users():
response = requests.get("https://api.example.com/users")
response.raise_for_status()
return {"users": response.json()}
@action(retry=3, timeout=10, backoff=True)
def fetch_orders():
response = requests.get("https://api.example.com/orders")
response.raise_for_status()
return {"orders": response.json()}
@action(retry=3, timeout=10, backoff=True)
def fetch_products():
response = requests.get("https://api.example.com/products")
response.raise_for_status()
return {"products": response.json()}
workflow = DotFlow()
workflow.task.add(step=fetch_users)
workflow.task.add(step=fetch_orders)
workflow.task.add(step=fetch_products)
workflow.start(mode="parallel")
All three API calls run at the same time. If one fails, the others continue (with keep_going=True):
workflow.start(mode="parallel", keep_going=True)
Using groups
Need parallel between groups but sequential within? Use group_name:
workflow = DotFlow()
# These two run sequentially (same group)
workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=process_users, group_name="users")
# These two run sequentially (same group)
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=process_orders, group_name="orders")
# The two groups run in parallel
workflow.start()
Step 5 — Save results with storage providers
By default, results stay in memory. To persist them:
File storage
from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile
config = Config(storage=StorageFile(path=".output"))
workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
Results are saved as JSON files in .output/tasks/.
AWS S3
pip install dotflow[aws]
from dotflow.providers import StorageS3
config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/"))
Google Cloud Storage
pip install dotflow[gcp]
from dotflow.providers import StorageGCS
config = Config(storage=StorageGCS(bucket="my-bucket", project="my-project"))
Step 6 — Checkpoint and resume
Long pipeline crashed at step 47? Don't restart from zero:
config = Config(storage=StorageFile())
workflow = DotFlow(config=config, workflow_id="550e8400-e29b-41d4-a716-446655440000")
workflow.task.add(step=step_1)
workflow.task.add(step=step_2)
workflow.task.add(step=step_3)
# ... many steps
# First run — saves checkpoint after each completed task
workflow.start()
# Pipeline crashed at step_3. Fix the bug and resume:
workflow.start(resume=True) # skips step_1 and step_2 automatically
Two requirements:
- A persistent storage provider (file, S3, or GCS)
- A fixed
workflow_id(so it knows which checkpoints to load)
Step 7 — Schedule with cron
Run your pipeline on a schedule:
pip install dotflow[scheduler]
from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron, StorageFile
@action(retry=3, timeout=30, backoff=True)
def extract():
return {"users": [...]}
@action
def transform(previous_context):
return {"processed": True}
@action
def load(previous_context):
print("Pipeline complete!")
return previous_context.storage
config = Config(
storage=StorageFile(path=".output"),
scheduler=SchedulerCron(cron="0 */6 * * *"), # every 6 hours
)
workflow = DotFlow(config=config, workflow_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890")
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.schedule(resume=True) # runs on cron + resumes from failure
What happens when a new execution triggers while the previous one is still running? Choose an overlap strategy:
# Skip — drop the new run (default)
SchedulerCron(cron="*/5 * * * *", overlap="skip")
# Queue — buffer one pending run
SchedulerCron(cron="*/5 * * * *", overlap="queue")
# Parallel — up to 10 concurrent executions
SchedulerCron(cron="*/5 * * * *", overlap="parallel")
Step 8 — Get notified on failure
Send a Telegram message when a task fails:
from dotflow.providers import NotifyTelegram
from dotflow.core.types.status import TypeStatus
config = Config(
storage=StorageFile(path=".output"),
notify=NotifyTelegram(
token="YOUR_BOT_TOKEN",
chat_id=123456789,
notification_type=TypeStatus.FAILED,
),
)
Step 9 — Inspect results
After execution, inspect everything:
workflow.start()
# Task objects with status, duration, errors
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status} ({task.duration}s)")
# Just the return values
for storage in workflow.result_storage():
print(storage)
# Serialized result (Pydantic model)
result = workflow.result()
Step 10 — Run from the CLI
No need to write a script. Run directly from the terminal:
# Simple execution
dotflow start --step pipeline.extract
# With execution mode
dotflow start --step pipeline.extract --mode parallel
# With file storage
dotflow start --step pipeline.extract --storage file --path .output
# Scheduled
dotflow schedule --step pipeline.extract --cron "0 */6 * * *" --storage file
Complete example
Here's everything together — a production-ready pipeline in one file:
from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile, NotifyTelegram
from dotflow.core.types.status import TypeStatus
@action(retry=3, timeout=30, backoff=True)
def extract():
return {
"users": [
{"name": "Alice", "age": 30, "active": True},
{"name": "Bob", "age": 25, "active": False},
{"name": "Charlie", "age": 35, "active": True},
]
}
@action
def transform(previous_context):
users = previous_context.storage["users"]
active = [u for u in users if u["active"]]
return {"active_users": active, "count": len(active)}
@action
def load(previous_context):
data = previous_context.storage
print(f"Loaded {data['count']} active users")
return data
config = Config(
storage=StorageFile(path=".output"),
notify=NotifyTelegram(
token="YOUR_BOT_TOKEN",
chat_id=123456789,
notification_type=TypeStatus.FAILED,
),
)
workflow = DotFlow(config=config, workflow_id="d4e5f6a7-b8c9-0123-4567-89abcdef0123")
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)
workflow.start()
# Inspect results
for task in workflow.result_task():
print(f"Task {task.task_id}: {task.status}")
What's next?
Dotflow is open source (MIT license) and actively maintained. If you found this useful, a star on GitHub helps the project grow.
Questions? Drop them in the comments.
Top comments (0)