DEV Community

Cover image for How to Create a Pipeline with Dotflow in Python
Fernando Celmer
Fernando Celmer

Posted on

How to Create a Pipeline with Dotflow in Python

In this tutorial, you'll learn how to build a complete data pipeline using Dotflow — a lightweight Python library that requires zero infrastructure.

No Redis. No RabbitMQ. No Postgres. No Docker. Just pip install dotflow.

What we'll build

A pipeline that:

  1. Extracts user data from a source
  2. Transforms it by filtering active users and calculating stats
  3. Loads the results into storage

Along the way, we'll add retry with backoff, parallel execution, checkpoint/resume, and cron scheduling.

Step 1 — Install Dotflow

pip install dotflow
Enter fullscreen mode Exit fullscreen mode

Step 2 — Create your first pipeline

Create a file called pipeline.py:

from dotflow import DotFlow, action


@action
def extract():
    """Simulate extracting data from a database or API."""
    return {
        "users": [
            {"name": "Alice", "age": 30, "active": True},
            {"name": "Bob", "age": 25, "active": False},
            {"name": "Charlie", "age": 35, "active": True},
            {"name": "Diana", "age": 28, "active": True},
        ]
    }


@action
def transform(previous_context):
    """Filter active users and calculate stats."""
    users = previous_context.storage["users"]
    active = [u for u in users if u["active"]]
    avg_age = sum(u["age"] for u in active) / len(active)

    return {
        "active_users": active,
        "total": len(active),
        "average_age": round(avg_age, 1),
    }


@action
def load(previous_context):
    """Print the final results."""
    data = previous_context.storage
    print(f"Active users: {data['total']}")
    print(f"Average age: {data['average_age']}")
    for user in data["active_users"]:
        print(f"  - {user['name']} ({user['age']})")
    return data


workflow = DotFlow()
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)

workflow.start()
Enter fullscreen mode Exit fullscreen mode

Run it:

python pipeline.py
Enter fullscreen mode Exit fullscreen mode

Output:

Active users: 3
Average age: 31.0
  - Alice (30)
  - Charlie (35)
  - Diana (28)
Enter fullscreen mode Exit fullscreen mode

That's a working pipeline in 40 lines. Let's make it production-ready.

Step 3 — Add retry and timeout

Real data sources fail. Let's make the extract step resilient:

import requests


@action(retry=3, timeout=10, backoff=True)
def extract():
    """Fetch users from an API with automatic retry."""
    response = requests.get("https://api.example.com/users")
    response.raise_for_status()
    return {"users": response.json()}
Enter fullscreen mode Exit fullscreen mode

What this does:

  • retry=3 — tries up to 3 times before failing
  • timeout=10 — each attempt has a 10-second limit
  • backoff=True — waits 1s, then 2s, then 4s between retries

If all retries fail, the task is marked as FAILED with full error tracking:

for task in workflow.result_task():
    if task.errors:
        for error in task.errors:
            print(f"Attempt {error.attempt}: {error.message}")
Enter fullscreen mode Exit fullscreen mode

Step 4 — Run tasks in parallel

If your tasks are independent, run them simultaneously:

@action(retry=3, timeout=10, backoff=True)
def fetch_users():
    response = requests.get("https://api.example.com/users")
    response.raise_for_status()
    return {"users": response.json()}


@action(retry=3, timeout=10, backoff=True)
def fetch_orders():
    response = requests.get("https://api.example.com/orders")
    response.raise_for_status()
    return {"orders": response.json()}


@action(retry=3, timeout=10, backoff=True)
def fetch_products():
    response = requests.get("https://api.example.com/products")
    response.raise_for_status()
    return {"products": response.json()}


workflow = DotFlow()
workflow.task.add(step=fetch_users)
workflow.task.add(step=fetch_orders)
workflow.task.add(step=fetch_products)

workflow.start(mode="parallel")
Enter fullscreen mode Exit fullscreen mode

All three API calls run at the same time. If one fails, the others continue (with keep_going=True):

workflow.start(mode="parallel", keep_going=True)
Enter fullscreen mode Exit fullscreen mode

Using groups

Need parallel between groups but sequential within? Use group_name:

workflow = DotFlow()

# These two run sequentially (same group)
workflow.task.add(step=fetch_users, group_name="users")
workflow.task.add(step=process_users, group_name="users")

# These two run sequentially (same group)
workflow.task.add(step=fetch_orders, group_name="orders")
workflow.task.add(step=process_orders, group_name="orders")

# The two groups run in parallel
workflow.start()
Enter fullscreen mode Exit fullscreen mode

Step 5 — Save results with storage providers

By default, results stay in memory. To persist them:

File storage

from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile

config = Config(storage=StorageFile(path=".output"))

workflow = DotFlow(config=config)
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)

workflow.start()
Enter fullscreen mode Exit fullscreen mode

Results are saved as JSON files in .output/tasks/.

AWS S3

pip install dotflow[aws]
Enter fullscreen mode Exit fullscreen mode
from dotflow.providers import StorageS3

config = Config(storage=StorageS3(bucket="my-bucket", prefix="pipelines/"))
Enter fullscreen mode Exit fullscreen mode

Google Cloud Storage

pip install dotflow[gcp]
Enter fullscreen mode Exit fullscreen mode
from dotflow.providers import StorageGCS

config = Config(storage=StorageGCS(bucket="my-bucket", project="my-project"))
Enter fullscreen mode Exit fullscreen mode

Step 6 — Checkpoint and resume

Long pipeline crashed at step 47? Don't restart from zero:

config = Config(storage=StorageFile())

workflow = DotFlow(config=config, workflow_id="550e8400-e29b-41d4-a716-446655440000")
workflow.task.add(step=step_1)
workflow.task.add(step=step_2)
workflow.task.add(step=step_3)
# ... many steps

# First run — saves checkpoint after each completed task
workflow.start()

# Pipeline crashed at step_3. Fix the bug and resume:
workflow.start(resume=True)  # skips step_1 and step_2 automatically
Enter fullscreen mode Exit fullscreen mode

Two requirements:

  • A persistent storage provider (file, S3, or GCS)
  • A fixed workflow_id (so it knows which checkpoints to load)

Step 7 — Schedule with cron

Run your pipeline on a schedule:

pip install dotflow[scheduler]
Enter fullscreen mode Exit fullscreen mode
from dotflow import DotFlow, Config, action
from dotflow.providers import SchedulerCron, StorageFile


@action(retry=3, timeout=30, backoff=True)
def extract():
    return {"users": [...]}


@action
def transform(previous_context):
    return {"processed": True}


@action
def load(previous_context):
    print("Pipeline complete!")
    return previous_context.storage


config = Config(
    storage=StorageFile(path=".output"),
    scheduler=SchedulerCron(cron="0 */6 * * *"),  # every 6 hours
)

workflow = DotFlow(config=config, workflow_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890")
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)

workflow.schedule(resume=True)  # runs on cron + resumes from failure
Enter fullscreen mode Exit fullscreen mode

What happens when a new execution triggers while the previous one is still running? Choose an overlap strategy:

# Skip — drop the new run (default)
SchedulerCron(cron="*/5 * * * *", overlap="skip")

# Queue — buffer one pending run
SchedulerCron(cron="*/5 * * * *", overlap="queue")

# Parallel — up to 10 concurrent executions
SchedulerCron(cron="*/5 * * * *", overlap="parallel")
Enter fullscreen mode Exit fullscreen mode

Step 8 — Get notified on failure

Send a Telegram message when a task fails:

from dotflow.providers import NotifyTelegram
from dotflow.core.types.status import TypeStatus

config = Config(
    storage=StorageFile(path=".output"),
    notify=NotifyTelegram(
        token="YOUR_BOT_TOKEN",
        chat_id=123456789,
        notification_type=TypeStatus.FAILED,
    ),
)
Enter fullscreen mode Exit fullscreen mode

Step 9 — Inspect results

After execution, inspect everything:

workflow.start()

# Task objects with status, duration, errors
for task in workflow.result_task():
    print(f"Task {task.task_id}: {task.status} ({task.duration}s)")

# Just the return values
for storage in workflow.result_storage():
    print(storage)

# Serialized result (Pydantic model)
result = workflow.result()
Enter fullscreen mode Exit fullscreen mode

Step 10 — Run from the CLI

No need to write a script. Run directly from the terminal:

# Simple execution
dotflow start --step pipeline.extract

# With execution mode
dotflow start --step pipeline.extract --mode parallel

# With file storage
dotflow start --step pipeline.extract --storage file --path .output

# Scheduled
dotflow schedule --step pipeline.extract --cron "0 */6 * * *" --storage file
Enter fullscreen mode Exit fullscreen mode

Complete example

Here's everything together — a production-ready pipeline in one file:

from dotflow import DotFlow, Config, action
from dotflow.providers import StorageFile, NotifyTelegram
from dotflow.core.types.status import TypeStatus


@action(retry=3, timeout=30, backoff=True)
def extract():
    return {
        "users": [
            {"name": "Alice", "age": 30, "active": True},
            {"name": "Bob", "age": 25, "active": False},
            {"name": "Charlie", "age": 35, "active": True},
        ]
    }


@action
def transform(previous_context):
    users = previous_context.storage["users"]
    active = [u for u in users if u["active"]]
    return {"active_users": active, "count": len(active)}


@action
def load(previous_context):
    data = previous_context.storage
    print(f"Loaded {data['count']} active users")
    return data


config = Config(
    storage=StorageFile(path=".output"),
    notify=NotifyTelegram(
        token="YOUR_BOT_TOKEN",
        chat_id=123456789,
        notification_type=TypeStatus.FAILED,
    ),
)

workflow = DotFlow(config=config, workflow_id="d4e5f6a7-b8c9-0123-4567-89abcdef0123")
workflow.task.add(step=extract)
workflow.task.add(step=transform)
workflow.task.add(step=load)

workflow.start()

# Inspect results
for task in workflow.result_task():
    print(f"Task {task.task_id}: {task.status}")
Enter fullscreen mode Exit fullscreen mode

What's next?


Dotflow is open source (MIT license) and actively maintained. If you found this useful, a star on GitHub helps the project grow.

Questions? Drop them in the comments.

Top comments (0)