DEV Community

Haji Rufai
Haji Rufai

Posted on

Building a DAG Workflow Orchestration Engine from Scratch in Python

Every data engineer knows Apache Airflow. But how many have built a workflow orchestrator from scratch? Understanding the internals — topological sorting, parallel execution, trigger rules, retry logic — transforms you from a user into an architect.

In this article, I'll walk through building TaskFlow, a lightweight DAG workflow orchestration engine in Python. It's ~3,100 lines of production-quality code with 107 passing tests.

GitHub: hajirufai/taskflow


What We're Building

TaskFlow lets you define workflows as directed acyclic graphs (DAGs) of tasks, then executes them with:

  • Parallel execution of independent tasks (asyncio)
  • Dependency resolution via topological sort
  • Trigger rules (all_success, all_done, one_success, none_failed)
  • Retry with exponential backoff and jitter
  • Timeout enforcement per task
  • SQLite-backed run history
  • Cron scheduling
  • REST API (FastAPI) + Rich CLI
🚀 Starting DAG run: etl_pipeline
  ✅ extract              [2.30s]
  ✅ validate             [0.12s]
  ✅ transform            [1.50s]
  ✅ load                 [3.10s]
  ✅ notify               [0.05s]
✨ DAG run completed in 5.60s (5/5 tasks succeeded)
Enter fullscreen mode Exit fullscreen mode

Core Architecture

┌─────────────────────────────────────────────┐
│              TaskFlow Engine                 │
├──────────┬──────────┬───────────┬───────────┤
│   DAG    │  Graph   │ Executor  │ Scheduler │
│ Registry │  Algos   │ (async)   │  (cron)   │
├──────────┴──────────┴───────────┴───────────┤
│         Storage (SQLite + aiosqlite)         │
├─────────────────────┬───────────────────────┤
│    REST API         │      Rich CLI         │
│   (FastAPI)         │     (Click)           │
└─────────────────────┴───────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The engine has four pillars:

  1. DAG Registry — stores workflow definitions with task configs
  2. Graph Algorithms — topological sort, cycle detection, parallel levels
  3. Async Executor — runs tasks with concurrency control
  4. Storage — persists run history to SQLite

Step 1: Graph Algorithms

The foundation of any workflow engine is graph theory. We need:

Topological Sort (Kahn's Algorithm)

This gives us a valid execution order — no task runs before its dependencies.

from collections import deque

def topological_sort(nodes, edges):
    """Kahn's algorithm: O(V + E) topological ordering."""
    in_degree = {n: 0 for n in nodes}
    for src, dsts in edges.items():
        for dst in dsts:
            in_degree[dst] += 1

    queue = deque(n for n in nodes if in_degree[n] == 0)
    result = []

    while queue:
        node = queue.popleft()
        result.append(node)
        for downstream in edges.get(node, []):
            in_degree[downstream] -= 1
            if in_degree[downstream] == 0:
                queue.append(downstream)

    if len(result) != len(nodes):
        raise CycleError("Graph contains a cycle!")
    return result
Enter fullscreen mode Exit fullscreen mode

Parallel Level Assignment

This groups tasks that can run concurrently:

def parallel_levels(nodes, edges):
    """Assign tasks to execution levels.
    Level 0 = no dependencies. Level N = max(upstream levels) + 1.
    """
    order = topological_sort(nodes, edges)
    reverse_edges = build_reverse_edges(edges)

    levels_map = {}
    for node in order:
        upstreams = reverse_edges.get(node, [])
        if not upstreams:
            levels_map[node] = 0
        else:
            levels_map[node] = max(levels_map[u] for u in upstreams) + 1

    # Group by level
    max_level = max(levels_map.values(), default=0)
    return [
        [n for n in order if levels_map[n] == i]
        for i in range(max_level + 1)
    ]
Enter fullscreen mode Exit fullscreen mode

For a diamond DAG (extract → [transform, validate] → load), this produces:

  • Level 0: [extract]
  • Level 1: [transform, validate] ← run in parallel!
  • Level 2: [load]

Step 2: DAG Definition with Decorators

The user-facing API uses Python decorators — familiar and Pythonic:

from taskflow import DAG, TriggerRule

dag = DAG("etl_pipeline", schedule="0 2 * * *")

@dag.task()
def extract():
    return {"records": fetch_data()}

@dag.task(depends_on=["extract"], retries=3, timeout=300)
def transform(extract=None):
    return clean(extract["records"])

@dag.task(depends_on=["transform"])
def load(transform=None):
    write_to_warehouse(transform)
Enter fullscreen mode Exit fullscreen mode

Under the hood, each @dag.task() call:

  1. Creates a TaskConfig with retry/timeout/trigger settings
  2. Wraps the function in a TaskDefinition
  3. Registers edges in the DAG's adjacency list
  4. Adds to a global registry for CLI/API discovery

Step 3: The Async Executor

The executor is where the magic happens. It runs tasks level by level, with proper concurrency control:

class Executor:
    async def execute(self, dag, run_id=None):
        # 1. Validate the DAG (check for cycles, missing deps)
        errors = dag.validate()
        if errors:
            raise ValueError(f"Invalid DAG: {errors}")

        # 2. Get parallel execution levels
        levels = parallel_levels(dag.tasks.keys(), dag.edges)

        # 3. Execute each level
        for level_tasks in levels:
            coros = [
                self._execute_task(dag, task_id, dag_run)
                for task_id in level_tasks
            ]
            await asyncio.gather(*coros)  # Parallel!

        return dag_run
Enter fullscreen mode Exit fullscreen mode

Trigger Rules

Before running a task, we evaluate its trigger rule against upstream states:

def _evaluate_trigger_rule(rule, depends_on, dag_run):
    upstream_states = [dag_run.task_runs[dep].state for dep in depends_on]

    if rule == TriggerRule.ALL_SUCCESS:
        return all(s == TaskState.SUCCESS for s in upstream_states)
    elif rule == TriggerRule.ALL_DONE:
        return all(s.is_terminal for s in upstream_states)
    elif rule == TriggerRule.ONE_SUCCESS:
        return any(s == TaskState.SUCCESS for s in upstream_states)
    elif rule == TriggerRule.NONE_FAILED:
        return not any(s in (FAILED, TIMED_OUT) for s in upstream_states)
Enter fullscreen mode Exit fullscreen mode

This enables patterns like cleanup tasks that run even when upstream tasks fail (ALL_DONE), or notification tasks that fire when at least one branch succeeds (ONE_SUCCESS).


Step 4: Retry with Exponential Backoff

Production workflows need retries. We implement exponential backoff with jitter to avoid thundering herds:

def compute_delay(attempt, base_delay=1.0, backoff=2.0, max_delay=300.0):
    delay = base_delay * (backoff ** attempt)
    delay = min(delay, max_delay)
    # Add ±25% jitter to avoid synchronized retries
    jitter = delay * 0.25
    delay += random.uniform(-jitter, jitter)
    return max(0.0, delay)
Enter fullscreen mode Exit fullscreen mode

The retry sequence for base_delay=1.0, backoff=2.0: ~1s → ~2s → ~4s → ~8s → ... capped at max_delay.


Step 5: SQLite Storage

Every run is persisted to SQLite for history and debugging:

async def save_dag_run(dag_run):
    db = await get_db()
    await db.execute(
        "INSERT INTO dag_runs (run_id, dag_id, state, ...) VALUES (?, ?, ?, ...)",
        (dag_run.run_id, dag_run.dag_id, dag_run.state.value, ...),
    )
    for task_run in dag_run.task_runs.values():
        await db.execute(
            "INSERT INTO task_runs (run_id, task_id, state, ...) VALUES (...)",
            ...
        )
Enter fullscreen mode Exit fullscreen mode

We use aiosqlite for async compatibility, WAL mode for concurrent reads, and automatic cleanup of old runs.


Step 6: Cron Scheduler

The built-in scheduler parses standard cron expressions:

class CronExpression:
    def __init__(self, expression):
        # Parse "0 */6 * * *" into sets of valid values
        parts = expression.split()
        self.minute = self._parse_field(parts[0], 0, 59)
        self.hour = self._parse_field(parts[1], 0, 23)
        # ... day, month, weekday

    def matches(self, dt):
        return (dt.minute in self.minute
                and dt.hour in self.hour
                and dt.day in self.day
                and dt.month in self.month
                and dt.weekday() in self.weekday)
Enter fullscreen mode Exit fullscreen mode

Supports wildcards (*), steps (*/6), ranges (9-17), and lists (1,15,28).


The REST API

FastAPI gives us automatic OpenAPI docs and async support:

@app.post("/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
    dag = DAG.get(dag_id)
    executor = Executor()
    dag_run = await executor.execute(dag)
    await save_dag_run(dag_run)
    return {
        "run_id": dag_run.run_id,
        "state": dag_run.state.value,
        "summary": dag_run.summary(),
    }
Enter fullscreen mode Exit fullscreen mode

Endpoints for listing DAGs, triggering runs, querying history, and getting task-level details.


Testing: 107 Tests in 0.94s

Comprehensive test coverage across all components:

Module Tests What's Covered
test_graph.py 18 Topo sort, cycles, levels, critical path
test_dag.py 14 Creation, validation, registry, context manager
test_executor.py 17 Execution, failures, timeouts, trigger rules, async
test_retry.py 10 Backoff, jitter, exhaustion
test_store.py 9 Save, query, filter, cleanup
test_scheduler.py 18 Cron parsing, matching, scheduling
test_api.py 11 All REST endpoints

Key testing patterns:

  • In-memory SQLite for fast, isolated storage tests
  • ASGITransport from httpx for testing FastAPI without a server
  • DAG registry cleanup between tests to prevent leakage
  • pytest-asyncio with asyncio_mode = "auto" for clean async tests

What I Learned

  1. Kahn's algorithm is elegant — just in-degree tracking and a queue. O(V+E) and naturally detects cycles.

  2. Trigger rules change everythingALL_DONE for cleanup, ONE_SUCCESS for fan-in patterns, NONE_FAILED for conditional logic. These four rules cover most real-world patterns.

  3. Jitter in retries matters — without it, all failed tasks retry at the same time (thundering herd). A ±25% jitter spreads the load.

  4. asyncio.Semaphore is perfect for concurrency limits — combined with asyncio.gather() for level-based parallelism.

  5. SQLite + WAL mode is surprisingly capable — for single-process orchestrators, it's all you need. No Postgres required.


Try It

git clone https://github.com/hajirufai/taskflow.git
cd taskflow
pip install -e ".[dev]"
pytest tests/ -v
python examples/etl_pipeline.py
Enter fullscreen mode Exit fullscreen mode

The full source is ~3,100 lines of Python with 107 tests. It's a great foundation for understanding how workflow orchestrators work under the hood.

GitHub: hajirufai/taskflow


Building things from scratch is the fastest way to truly understand them. If you're interviewing for data engineering roles, being able to explain topological sort and trigger rules from first principles sets you apart.

Top comments (0)