DEV Community

Cover image for Building a simple async scheduler with generators in Python
Michael Saparov
Michael Saparov

Posted on

Building a simple async scheduler with generators in Python

Asyncio in Python is built on coroutines and an event loop — a mechanism that manages their execution.

In this article, we’ll build a simplified model of this approach and recreate similar behavior using generators (yield). This will help us clearly see how tasks pause execution and hand control back to the scheduler — without any “magic” behind async/await.
In this article, we treat generators as coroutines.
We will:

  • build a primitive task scheduler
  • understand how tasks “pause”
  • add time-based waiting
  • and end up with a simplified version of asyncio

No magic — just yield.

The Simplest Scheduler

Let’s start with a minimal example:

def task1():
    print("Start Apollo-01")
    yield
    print("Start Apollo-02")
    yield
    print("Start Apollo-03")


def task2():
    print("Start Artemis-01")
    yield
    print("Start Artemis-02")
    yield
    print("Start Artemis-03")


tasks = [task1(), task2()]

while tasks:
    new_tasks = []

    for t in tasks:
        try:
            next(t)
            new_tasks.append(t)
        except StopIteration:
            pass

    tasks = new_tasks
Enter fullscreen mode Exit fullscreen mode

What’s happening here

  • task1() and task2() are generator-based coroutines, not regular functions
  • yield is the point where a task pauses
  • next(t) resumes execution until the next yield
  • the while loop is our scheduler (event loop)

Output:

Start Apollo-01
Start Artemis-01
Start Apollo-02
Start Artemis-02
Start Apollo-03
Start Artemis-03
Enter fullscreen mode Exit fullscreen mode

Tasks run in turns, yielding control to each other.

The Problem

This scheduler:

  • cannot wait (sleep)
  • does not handle time
  • simply switches tasks in a loop

There’s no way to say:
“wake me up in 2 seconds”

Adding a Wait Signal

Let’s allow tasks to communicate with the scheduler.

from enum import Enum, auto

class Op(Enum):
    WAIT = auto()
Enter fullscreen mode Exit fullscreen mode

Now define sleep:

def sleep(delay):
    yield Op.WAIT, delay
Enter fullscreen mode Exit fullscreen mode

Now a task can yield not just control, but also an instruction.

Tasks Control the Scheduler

def task():
    print("start")
    yield from sleep(2)
    print("end")
Enter fullscreen mode Exit fullscreen mode

Now the task explicitly tells the scheduler:

  • when to pause
  • and for how long

Scaling Problem

Imagine we have 10,000 tasks:

  • one wakes up in 1 second
  • another in 3 seconds
  • another in 0.5 seconds

We need an efficient way to always pick the next task to run.

Heap (Priority Queue)

A heap allows us to efficiently retrieve the smallest element.

In Python — heapq.

We store:

(step_at, index, coroutine)
Enter fullscreen mode Exit fullscreen mode

Where:

  • step_at — when the task should resume
  • index — prevents comparison issues
  • coroutine — the task itself

Final Example — Rocket Launches

import heapq
import random
import time
from enum import Enum, auto
from typing import Generator

Rocket = tuple[str, float, int]


class Op(Enum):
    WAIT = auto()


def random_delay() -> float:
    return random.random() * 5


def sleep(delay):
    yield Op.WAIT, delay


def now():
    return time.time()


def random_countdown() -> int:
    return random.randrange(5)


def launch_rocket(rocket_name: str, delay: float, countdown: int):
    yield from sleep(delay)

    for i in reversed(range(countdown)):
        print(f"{rocket_name}: {i + 1}...")
        yield from sleep(1)

    print(f"Rocket {rocket_name} is launched")


def create_rockets(n: int = 10_000) -> Generator[Rocket, None, None]:
    for i in range(n):
        yield (f"Artemis-{i}", random_delay(), random_countdown())


def run():
    rockets = create_rockets()

    work = [
        (
            now(),
            index,
            launch_rocket(name, delay, countdown),
        )
        for index, (name, delay, countdown) in enumerate(rockets)
    ]

    heapq.heapify(work)

    while work:
        step_at, index, coro = heapq.heappop(work)

        wait = step_at - now()
        if wait > 0:
            time.sleep(wait)

        try:
            op, arg = coro.send(None)
        except StopIteration:
            continue

        if op == Op.WAIT:
            step_at = now() + arg
            heapq.heappush(work, (step_at, index, coro))


if __name__ == "__main__":
    run()
Enter fullscreen mode Exit fullscreen mode

How It Works

  • All tasks are placed into a heap
  • We pick the one with the smallest step_at
  • If needed — we wait (time.sleep)
  • Resume execution (send(None))
  • The task returns:
  • - WAIT → reschedule
  • - StopIteration → task is finished Calling send(None) is equivalent to next() — it resumes the coroutine until the next yield.

Conclusion

We’ve built a simplified model of asyncio:

  • yield — pause point
  • generator — coroutine
  • loop — event loop
  • heapq — time-based scheduler

And most importantly — no magic.

async/await is just a more convenient syntax built on top of these same ideas.

Top comments (0)