Margaret found Timothy in the library's coordination room, staring at a wall covered in task dependency charts. "The Event pattern works beautifully for simple signals," he said, "but I've hit something more complex. I need tasks to wait for specific conditions to become true, then wake up and check again. Events are too blunt - they're either set or not set. I need something more nuanced."
"Ah," Margaret smiled, "you need a Condition. Think of it as a smart waiting room where tasks can sleep until they're told that something interesting has changed, then wake up to check if the change matters to them specifically."
The Problem: When Events Aren't Enough
Timothy showed Margaret his code for managing a shared resource with capacity limits:
import asyncio
# Attempt 1: Using Event (doesn't quite work)
class SharedResource:
    def __init__(self, capacity):
        self.capacity = capacity
        self.in_use = 0
        self.available_event = asyncio.Event()
        self.available_event.set()  # Initially available
    async def acquire(self):
        while True:
            await self.available_event.wait()
            if self.in_use < self.capacity:
                self.in_use += 1
                if self.in_use >= self.capacity:
                    self.available_event.clear()
                return
            # Race condition: another task might have grabbed it
            # We need to wait again, but the event is still set!
    async def release(self):
        self.in_use -= 1
        self.available_event.set()
async def demonstrate_event_problem():
    resource = SharedResource(capacity=2)
    async def worker(worker_id):
        print(f"Worker {worker_id} trying to acquire")
        await resource.acquire()
        print(f"Worker {worker_id} acquired resource")
        await asyncio.sleep(2)
        await resource.release()
        print(f"Worker {worker_id} released resource")
    # Create 5 workers competing for 2 slots
    await asyncio.gather(*[worker(i) for i in range(5)])
# asyncio.run(demonstrate_event_problem())
"The problem," Timothy explained, "is that Event just signals 'something changed' but doesn't help coordinate multiple tasks checking different conditions. I get race conditions and spurious wakeups."
Margaret nodded. "Events are binary flags. You need a Condition, which combines a Lock with a notification system. It's like a waiting room with a sophisticated paging system."
Understanding Condition Variables
"A Condition," Margaret explained, "is built on three core operations:
- wait() - Release the lock and sleep until notified
 - notify() - Wake up one waiting task
 - notify_all() - Wake up all waiting tasks
 
The key insight: tasks can wait for different conditions on the same lock."
She showed Timothy the corrected code:
import asyncio
class SharedResourceWithCondition:
    def __init__(self, capacity):
        self.capacity = capacity
        self.in_use = 0
        self.condition = asyncio.Condition()
    async def acquire(self):
        async with self.condition:
            # Wait until capacity is available
            while self.in_use >= self.capacity:
                await self.condition.wait()
            self.in_use += 1
            print(f"Acquired. In use: {self.in_use}/{self.capacity}")
    async def release(self):
        async with self.condition:
            self.in_use -= 1
            print(f"Released. In use: {self.in_use}/{self.capacity}")
            # Notify one waiting task
            self.condition.notify()
async def demonstrate_condition():
    resource = SharedResourceWithCondition(capacity=2)
    async def worker(worker_id):
        print(f"Worker {worker_id} trying to acquire")
        await resource.acquire()
        print(f"Worker {worker_id} got resource, working...")
        await asyncio.sleep(1)
        await resource.release()
        print(f"Worker {worker_id} done")
    # Create 5 workers competing for 2 slots
    await asyncio.gather(*[worker(i) for i in range(5)])
asyncio.run(demonstrate_condition())
Output:
Worker 0 trying to acquire
Acquired. In use: 1/2
Worker 0 got resource, working...
Worker 1 trying to acquire
Acquired. In use: 2/2
Worker 1 got resource, working...
Worker 2 trying to acquire
Worker 3 trying to acquire
Worker 4 trying to acquire
Released. In use: 1/2
Worker 2 got resource, working...
Acquired. In use: 2/2
Released. In use: 1/2
Worker 3 got resource, working...
Acquired. In use: 2/2
...
The Wait Pattern: Always Use a While Loop
Timothy noticed the while loop in the acquire() method. "Why not just if?" he asked.
"Critical detail," Margaret emphasized. "You must always use while with wait(), never if. Here's why:
# WRONG - Don't do this
async def acquire_wrong(self):
    async with self.condition:
        if self.in_use >= self.capacity:  # ❌ WRONG
            await self.condition.wait()
        self.in_use += 1
# RIGHT - Always do this
async def acquire_right(self):
    async with self.condition:
        while self.in_use >= self.capacity:  # ✅ CORRECT
            await self.condition.wait()
        self.in_use += 1
"Three reasons for the while loop:
- Spurious wakeups: The task might wake up even though nothing changed
 - notify_all(): Multiple tasks wake up, but only some conditions are met
 - Condition changes: Between notification and waking, another task might have changed things
 
"The while loop rechecks the condition after every wakeup. It's defensive programming that prevents subtle bugs."
Producer-Consumer Pattern
"Let's look at a classic use case," Margaret said, showing Timothy a producer-consumer queue:
import asyncio
from collections import deque
class BoundedQueue:
    def __init__(self, max_size):
        self.max_size = max_size
        self.queue = deque()
        self.condition = asyncio.Condition()
    async def put(self, item):
        async with self.condition:
            # Wait while queue is full
            while len(self.queue) >= self.max_size:
                await self.condition.wait()
            self.queue.append(item)
            print(f"Produced: {item}. Queue size: {len(self.queue)}")
            # Notify a waiting consumer
            self.condition.notify()
    async def get(self):
        async with self.condition:
            # Wait while queue is empty
            while len(self.queue) == 0:
                await self.condition.wait()
            item = self.queue.popleft()
            print(f"Consumed: {item}. Queue size: {len(self.queue)}")
            # Notify a waiting producer
            self.condition.notify()
            return item
async def demonstrate_producer_consumer():
    queue = BoundedQueue(max_size=3)
    async def producer(producer_id):
        for i in range(5):
            item = f"P{producer_id}-{i}"
            await queue.put(item)
            await asyncio.sleep(0.5)  # Slow producer
    async def consumer(consumer_id):
        for _ in range(5):
            item = await queue.get()
            await asyncio.sleep(1)  # Slower consumer
    # Two producers, two consumers
    await asyncio.gather(
        producer(1),
        producer(2),
        consumer(1),
        consumer(2)
    )
asyncio.run(demonstrate_producer_consumer())
Output:
Produced: P1-0. Queue size: 1
Produced: P2-0. Queue size: 2
Consumed: P1-0. Queue size: 1
Produced: P1-1. Queue size: 2
Produced: P2-1. Queue size: 3
Consumed: P2-0. Queue size: 2
Produced: P1-2. Queue size: 3
Consumed: P1-1. Queue size: 2
...
"Notice," Margaret pointed out, "that producers automatically block when the queue is full, and consumers automatically block when it's empty. The Condition handles all the coordination."
Timothy looked thoughtful. "This seems like a common pattern. Does Python provide something built-in?"
"Absolutely," Margaret said. "Python's asyncio.Queue is production-ready and handles all this for you, plus priorities, task tracking, and more. This example shows how Conditions work under the hood. In real code, just use asyncio.Queue - but understanding this implementation helps you build your own coordination patterns when needed."
notify() vs notify_all()
Timothy asked about the difference between notify() and notify_all(). Margaret created a comparison:
import asyncio
class TaskCoordinator:
    def __init__(self):
        self.ready = False
        self.condition = asyncio.Condition()
    async def wait_for_signal(self, waiter_id):
        async with self.condition:
            print(f"Waiter {waiter_id} waiting...")
            while not self.ready:
                await self.condition.wait()
            print(f"Waiter {waiter_id} proceeding!")
    async def signal_one(self):
        async with self.condition:
            print("Signaling ONE waiter")
            self.ready = True
            self.condition.notify()  # Wakes up one task
    async def signal_all(self):
        async with self.condition:
            print("Signaling ALL waiters")
            self.ready = True
            self.condition.notify_all()  # Wakes up all tasks
async def demonstrate_notify_vs_notify_all():
    print("=== Using notify() - only one wakes up ===")
    coordinator = TaskCoordinator()
    waiters = [
        coordinator.wait_for_signal(i)
        for i in range(3)
    ]
    await asyncio.sleep(0.1)  # Let waiters start
    await coordinator.signal_one()
    await asyncio.sleep(0.1)
    print("\n=== Using notify_all() - all wake up ===")
    coordinator = TaskCoordinator()
    waiters = [
        asyncio.create_task(coordinator.wait_for_signal(i))
        for i in range(3)
    ]
    await asyncio.sleep(0.1)  # Let waiters start
    await coordinator.signal_all()
    await asyncio.gather(*waiters)
asyncio.run(demonstrate_notify_vs_notify_all())
Output:
=== Using notify() - only one wakes up ===
Waiter 0 waiting...
Waiter 1 waiting...
Waiter 2 waiting...
Signaling ONE waiter
Waiter 0 proceeding!
=== Using notify_all() - all wake up ===
Waiter 0 waiting...
Waiter 1 waiting...
Waiter 2 waiting...
Signaling ALL waiters
Waiter 0 proceeding!
Waiter 1 proceeding!
Waiter 2 proceeding!
"Use notify()," Margaret explained, "when only one waiting task should proceed - like in a bounded queue where releasing one slot means one producer can add one item. Use notify_all() when multiple tasks might want to proceed - like broadcasting that a shutdown has been initiated."
Real-World Pattern: Read-Write Lock
"Here's a sophisticated example," Margaret said, "a read-write lock that allows multiple readers but exclusive writers:"
import asyncio
class ReadWriteLock:
    def __init__(self):
        self.readers = 0
        self.writer = False
        self.condition = asyncio.Condition()
    async def acquire_read(self):
        async with self.condition:
            # Wait while a writer is active
            while self.writer:
                await self.condition.wait()
            self.readers += 1
    async def release_read(self):
        async with self.condition:
            self.readers -= 1
            # If no more readers, notify waiting writer
            if self.readers == 0:
                self.condition.notify()
    async def acquire_write(self):
        async with self.condition:
            # Wait while readers or writer are active
            while self.readers > 0 or self.writer:
                await self.condition.wait()
            self.writer = True
    async def release_write(self):
        async with self.condition:
            self.writer = False
            # Notify all waiting tasks (both readers and writers)
            self.condition.notify_all()
class SharedDocument:
    def __init__(self):
        self.content = "Initial content"
        self.lock = ReadWriteLock()
    async def read(self, reader_id):
        await self.lock.acquire_read()
        try:
            print(f"Reader {reader_id} reading: {self.content}")
            await asyncio.sleep(1)
            return self.content
        finally:
            await self.lock.release_read()
    async def write(self, writer_id, new_content):
        await self.lock.acquire_write()
        try:
            print(f"Writer {writer_id} writing: {new_content}")
            self.content = new_content
            await asyncio.sleep(2)
        finally:
            await self.lock.release_write()
async def demonstrate_read_write_lock():
    doc = SharedDocument()
    async def reader(reader_id):
        for i in range(3):
            await doc.read(reader_id)
            await asyncio.sleep(0.5)
    async def writer(writer_id):
        await asyncio.sleep(1)
        await doc.write(writer_id, f"Content from writer {writer_id}")
    # Multiple readers can read concurrently
    # Writers get exclusive access
    await asyncio.gather(
        reader(1),
        reader(2),
        writer(1),
        reader(3)
    )
asyncio.run(demonstrate_read_write_lock())
Output:
Reader 1 reading: Initial content
Reader 2 reading: Initial content
Reader 3 reading: Initial content
Writer 1 writing: Content from writer 1
Reader 1 reading: Content from writer 1
Reader 2 reading: Content from writer 1
Reader 3 reading: Content from writer 1
...
"Notice," Margaret said, "that multiple readers can access the document simultaneously, but when a writer needs access, it waits for all readers to finish, then gets exclusive access."
Timothy frowned. "But what if readers keep arriving? Won't the writer wait forever?"
"Good catch," Margaret nodded. "This implementation can starve writers if readers continuously arrive. In production, you'd want writer priority - track waiting writers and block new readers when a writer is waiting. This example demonstrates the Condition pattern, not production-ready read-write locks."
Common Patterns and Idioms
Margaret showed Timothy a reference sheet of common Condition patterns:
import asyncio
class ConditionPatterns:
    def __init__(self):
        self.condition = asyncio.Condition()
        self.state = 0
    # Pattern 1: Wait for specific value
    async def wait_for_value(self, target_value):
        async with self.condition:
            while self.state != target_value:
                await self.condition.wait()
            print(f"State reached {target_value}")
    # Pattern 2: Wait for predicate
    async def wait_for_predicate(self, predicate):
        async with self.condition:
            while not predicate(self.state):
                await self.condition.wait()
            print("Predicate satisfied")
    # Pattern 3: Notify with state change
    async def update_state(self, new_state):
        async with self.condition:
            self.state = new_state
            self.condition.notify_all()
    # Pattern 4: Timeout on wait
    async def wait_with_timeout(self, target_value, timeout):
        async with self.condition:
            try:
                await asyncio.wait_for(
                    self._wait_for_value_internal(target_value),
                    timeout=timeout
                )
                return True
            except asyncio.TimeoutError:
                print("Wait timed out")
                return False
    async def _wait_for_value_internal(self, target_value):
        while self.state != target_value:
            await self.condition.wait()
async def demonstrate_patterns():
    patterns = ConditionPatterns()
    async def waiter():
        await patterns.wait_for_value(5)
    async def updater():
        for i in range(6):
            await asyncio.sleep(0.5)
            await patterns.update_state(i)
    await asyncio.gather(waiter(), updater())
asyncio.run(demonstrate_patterns())
When to Use Condition vs Other Primitives
Timothy pulled out his notes. "So when do I use each primitive?"
Margaret created a decision tree:
"""
Decision Tree for Async Synchronization:
Need to signal a one-time event?
└─> Use Event
    Example: "Initialization complete"
Need to limit concurrent access to N resources?
└─> Use Semaphore(N)
    Example: Database connection pool
Need exclusive access to shared state?
└─> Use Lock
    Example: Updating a counter
Need to wait for complex conditions on shared state?
└─> Use Condition
    Example: "Wait until queue has at least 5 items"
             "Wait until all workers are idle"
             "Wait until value is in specific range"
Need ordered message passing between tasks?
└─> Use asyncio.Queue
    Example: Producer-consumer pipeline
"""
"Use Condition," Margaret emphasized, "when you need to coordinate around complex state changes. It's more powerful than Event or Lock alone, but also more complex. If a simpler primitive works, use that instead."
Common Pitfalls
"Before you start using Conditions everywhere," Margaret warned, "let me show you the common mistakes:"
import asyncio
class CommonMistakes:
    def __init__(self):
        self.condition = asyncio.Condition()
        self.ready = False
    # MISTAKE 1: Using if instead of while
    async def mistake_using_if(self):
        async with self.condition:
            if not self.ready:  # ❌ WRONG
                await self.condition.wait()
            # Might still not be ready due to spurious wakeup!
    # MISTAKE 2: Forgetting the async with
    async def mistake_no_context_manager(self):
        # ❌ WRONG - Must use async with
        while not self.ready:
            await self.condition.wait()  # Will raise error
    # MISTAKE 3: Notifying without holding the lock
    async def mistake_notify_without_lock(self):
        self.ready = True
        # ❌ WRONG - Should notify inside async with block
        self.condition.notify()
    # MISTAKE 4: Modifying state outside the lock
    async def mistake_state_outside_lock(self):
        self.ready = True  # ❌ WRONG - Race condition
        async with self.condition:
            self.condition.notify()
    # CORRECT PATTERN
    async def correct_pattern(self):
        async with self.condition:
            # 1. Use while, not if
            while not self.ready:
                await self.condition.wait()
            # 2. Modify state inside the lock
            self.ready = True
            # 3. Notify inside the lock
            self.condition.notify_all()
Testing Conditions
"Finally," Margaret said, "let's talk about testing code that uses Conditions:"
import asyncio
import pytest
class TaskQueue:
    def __init__(self):
        self.tasks = []
        self.condition = asyncio.Condition()
    async def add_task(self, task):
        async with self.condition:
            self.tasks.append(task)
            self.condition.notify()
    async def wait_for_tasks(self, minimum_count):
        async with self.condition:
            while len(self.tasks) < minimum_count:
                await self.condition.wait()
            return self.tasks[:minimum_count]
@pytest.mark.asyncio
async def test_task_queue_waits():
    queue = TaskQueue()
    received_tasks = []
    async def waiter():
        tasks = await queue.wait_for_tasks(3)
        received_tasks.extend(tasks)
    async def producer():
        await asyncio.sleep(0.1)
        await queue.add_task("task1")
        await asyncio.sleep(0.1)
        await queue.add_task("task2")
        await asyncio.sleep(0.1)
        await queue.add_task("task3")
    # Run waiter and producer concurrently
    await asyncio.gather(waiter(), producer())
    # Verify waiter received the tasks
    assert len(received_tasks) == 3
    assert received_tasks == ["task1", "task2", "task3"]
@pytest.mark.asyncio
async def test_condition_timeout():
    queue = TaskQueue()
    # Wait for tasks that will never arrive
    async def waiter():
        try:
            await asyncio.wait_for(
                queue.wait_for_tasks(10),
                timeout=0.5
            )
            return False  # Should not reach here
        except asyncio.TimeoutError:
            return True  # Expected timeout
    result = await waiter()
    assert result is True
# Run tests with: pytest test_conditions.py -v
Timothy studied the code. "So Conditions are like a sophisticated waiting room where tasks can sleep until specific conditions are met, and they always recheck their condition after waking up."
"Exactly," Margaret smiled. "They're the right tool when you need coordination around complex shared state. Master this pattern, and you'll handle the most sophisticated async coordination scenarios Python throws at you."
Key Takeaways
- Use Condition for complex coordination around shared state
 - 
Always use 
while, neverifwhen checking conditions inwait() - Hold the lock when modifying state and notifying
 - 
Use 
notify()to wake one task,notify_all()to wake all tasks - 
Combine with 
async withto automatically acquire/release the lock - Test with timeouts to prevent hanging in tests
 
With Conditions in his toolkit, Timothy had the most powerful coordination primitive asyncio offers - ready for the most complex multi-task scenarios.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
              
    
Top comments (0)