DEV Community

Cover image for The Secret Life of Python: The Barrier - Synchronizing Groups at Checkpoints
Aaron Rose
Aaron Rose

Posted on

The Secret Life of Python: The Barrier - Synchronizing Groups at Checkpoints

Timothy burst into Margaret's office with his laptop. "I've got a distributed data processing system - five worker tasks each processing their chunk of data. But here's the problem: they all need to finish Phase 1 before any of them can start Phase 2. Right now, the fast workers race ahead while slow ones lag behind, and everything gets corrupted."

Margaret smiled. "You need a Barrier. Think of it as a gate that won't open until everyone has arrived. Perfect for coordinating groups of tasks that must move through phases together."

The Problem: Uncoordinated Phase Transitions

Timothy showed Margaret his broken code:

import asyncio
import random

class DataProcessor:
    def __init__(self):
        self.phase_1_results = []
        self.phase_2_results = []

    async def process_chunk(self, worker_id, data_chunk):
        # Phase 1: Initial processing
        print(f"Worker {worker_id} starting Phase 1")
        await asyncio.sleep(random.uniform(0.5, 2.0))  # Simulate work
        result_1 = f"Phase1-W{worker_id}"
        self.phase_1_results.append(result_1)
        print(f"Worker {worker_id} finished Phase 1")

        # Phase 2: Depends on ALL Phase 1 results
        # BUG: Fast workers start Phase 2 before slow workers finish Phase 1!
        print(f"Worker {worker_id} starting Phase 2")
        await asyncio.sleep(0.5)
        # This worker might see incomplete phase_1_results!
        result_2 = f"Phase2-W{worker_id} (saw {len(self.phase_1_results)} results)"
        self.phase_2_results.append(result_2)
        print(f"Worker {worker_id} finished Phase 2")

async def demonstrate_broken_phases():
    processor = DataProcessor()

    workers = [
        processor.process_chunk(i, f"chunk-{i}")
        for i in range(5)
    ]

    await asyncio.gather(*workers)

    print(f"\nPhase 1 results: {len(processor.phase_1_results)}")
    print(f"Phase 2 results: {processor.phase_2_results}")

# asyncio.run(demonstrate_broken_phases())
Enter fullscreen mode Exit fullscreen mode

Output (broken):

Worker 0 starting Phase 1
Worker 1 starting Phase 1
Worker 2 starting Phase 1
Worker 3 starting Phase 1
Worker 4 starting Phase 1
Worker 0 finished Phase 1
Worker 0 starting Phase 2  # ❌ Started Phase 2 too early!
Worker 0 finished Phase 2
Worker 2 finished Phase 1
Worker 2 starting Phase 2  # ❌ Only sees 2 Phase 1 results!
Worker 2 finished Phase 2
Worker 1 finished Phase 1
...
Enter fullscreen mode Exit fullscreen mode

"See?" Timothy pointed. "Worker 0 finishes Phase 1 quickly and races into Phase 2, but it only sees partial results. I need all workers to wait at the boundary between phases."

Enter the Barrier

"This is exactly what asyncio.Barrier solves," Margaret explained. "A Barrier is initialized with a count - the number of tasks that must arrive. When a task calls wait(), it blocks until all N tasks have arrived. Then they all proceed together."

She rewrote Timothy's code:

import asyncio
import random

class DataProcessorWithBarrier:
    def __init__(self, num_workers):
        self.phase_1_results = []
        self.phase_2_results = []
        self.barrier = asyncio.Barrier(num_workers)

    async def process_chunk(self, worker_id, data_chunk):
        # Phase 1: Initial processing
        print(f"Worker {worker_id} starting Phase 1")
        await asyncio.sleep(random.uniform(0.5, 2.0))
        result_1 = f"Phase1-W{worker_id}"
        self.phase_1_results.append(result_1)
        print(f"Worker {worker_id} finished Phase 1, waiting at barrier...")

        # Wait for ALL workers to complete Phase 1
        await self.barrier.wait()

        # Phase 2: Now ALL Phase 1 results are guaranteed available
        print(f"Worker {worker_id} starting Phase 2")
        await asyncio.sleep(0.5)
        result_2 = f"Phase2-W{worker_id} (saw {len(self.phase_1_results)} results)"
        self.phase_2_results.append(result_2)
        print(f"Worker {worker_id} finished Phase 2")

async def demonstrate_barrier():
    num_workers = 5
    processor = DataProcessorWithBarrier(num_workers)

    workers = [
        processor.process_chunk(i, f"chunk-{i}")
        for i in range(num_workers)
    ]

    await asyncio.gather(*workers)

    print(f"\nAll workers saw {len(processor.phase_1_results)} Phase 1 results")
    print(f"Phase 2 results: {processor.phase_2_results}")

asyncio.run(demonstrate_barrier())
Enter fullscreen mode Exit fullscreen mode

Output (correct):

Worker 0 starting Phase 1
Worker 1 starting Phase 1
Worker 2 starting Phase 1
Worker 3 starting Phase 1
Worker 4 starting Phase 1
Worker 0 finished Phase 1, waiting at barrier...
Worker 2 finished Phase 1, waiting at barrier...
Worker 3 finished Phase 1, waiting at barrier...
Worker 1 finished Phase 1, waiting at barrier...
Worker 4 finished Phase 1, waiting at barrier...
Worker 4 starting Phase 2  # ✅ All workers released together!
Worker 0 starting Phase 2
Worker 1 starting Phase 2
Worker 2 starting Phase 2
Worker 3 starting Phase 2
Worker 0 finished Phase 2
Worker 4 finished Phase 2
...

All workers saw 5 Phase 1 results
Phase 2 results: ['Phase2-W4 (saw 5 results)', ...]
Enter fullscreen mode Exit fullscreen mode

"Perfect!" Timothy exclaimed. "Now every worker sees all 5 Phase 1 results before starting Phase 2."

"One detail," Margaret added. "The wait() method returns an integer - the arrival index from 0 to N-1, where 0 is the first task to arrive. You usually don't need this, but it can be useful if you want one specific task to perform cleanup or special handling."

async def worker_with_index(self, worker_id):
    async with self.condition:
        while self.in_use >= self.capacity:
            await self.condition.wait()
        self.in_use += 1

    # Get arrival index at barrier
    arrival_index = await self.barrier.wait()

    # Last task to arrive (highest index) does special work
    if arrival_index == self.barrier.parties - 1:
        print(f"Worker {worker_id} was last to arrive, doing cleanup...")
Enter fullscreen mode Exit fullscreen mode

Timothy nodded. "So I can use the return value to assign roles, but most of the time I just ignore it."

Multiple Phases with Reusable Barriers

Margaret continued: "The beautiful thing about Barriers is they're reusable. After all tasks pass through, the Barrier automatically resets. You can use the same Barrier for multiple synchronization points."

import asyncio
import random

class MultiPhaseProcessor:
    def __init__(self, num_workers):
        self.num_workers = num_workers
        self.barrier = asyncio.Barrier(num_workers)
        self.phase_results = {1: [], 2: [], 3: []}

    async def process_with_phases(self, worker_id):
        phases = [1, 2, 3]

        for phase in phases:
            # Do work for this phase
            print(f"Worker {worker_id} processing Phase {phase}")
            await asyncio.sleep(random.uniform(0.3, 1.0))

            result = f"P{phase}-W{worker_id}"
            self.phase_results[phase].append(result)

            print(f"Worker {worker_id} finished Phase {phase}, waiting...")

            # Wait for all workers to finish this phase
            await self.barrier.wait()

            print(f"Worker {worker_id} proceeding to next phase")

        print(f"Worker {worker_id} COMPLETE")

async def demonstrate_multiple_phases():
    processor = MultiPhaseProcessor(num_workers=4)

    workers = [
        processor.process_with_phases(i)
        for i in range(4)
    ]

    await asyncio.gather(*workers)

    for phase, results in processor.phase_results.items():
        print(f"Phase {phase}: {len(results)} results")

asyncio.run(demonstrate_multiple_phases())
Enter fullscreen mode Exit fullscreen mode

Output:

Worker 0 processing Phase 1
Worker 1 processing Phase 1
Worker 2 processing Phase 1
Worker 3 processing Phase 1
Worker 2 finished Phase 1, waiting...
Worker 0 finished Phase 1, waiting...
Worker 3 finished Phase 1, waiting...
Worker 1 finished Phase 1, waiting...
Worker 1 proceeding to next phase  # ✅ All released together
Worker 0 proceeding to next phase
Worker 2 proceeding to next phase
Worker 3 proceeding to next phase
Worker 0 processing Phase 2
Worker 1 processing Phase 2
...
Enter fullscreen mode Exit fullscreen mode

"The same Barrier," Margaret emphasized, "coordinates all three phase transitions. It resets automatically after each use."

Barrier Actions: Run Code When All Arrive

Timothy asked, "Can I run some code when all tasks reach the Barrier? Like logging or aggregating results?"

"Excellent question," Margaret said. "You can provide an action callback - a function that runs once when the Barrier is full, before releasing the waiting tasks."

import asyncio
import random

class ProcessorWithBarrierAction:
    def __init__(self, num_workers):
        self.num_workers = num_workers
        self.phase_results = []
        self.current_phase = 0

        # Barrier with action callback
        self.barrier = asyncio.Barrier(
            num_workers,
            action=self.on_barrier_full
        )

    def on_barrier_full(self):
        """Called once when all workers reach the barrier

        Note: Action must be a regular function, not async.
        It runs in the context of the last task to arrive.
        """
        self.current_phase += 1
        print(f"\n{'='*50}")
        print(f"BARRIER REACHED: Phase {self.current_phase} complete!")
        print(f"Collected {len(self.phase_results)} results this phase")
        print(f"{'='*50}\n")
        self.phase_results.clear()  # Reset for next phase

    async def worker(self, worker_id):
        for phase in range(3):
            print(f"Worker {worker_id} working on phase {phase + 1}")
            await asyncio.sleep(random.uniform(0.2, 0.8))
            self.phase_results.append(f"W{worker_id}-P{phase + 1}")

            # Wait at barrier - action runs when last worker arrives
            await self.barrier.wait()

            print(f"Worker {worker_id} continuing from phase {phase + 1}")

async def demonstrate_barrier_action():
    processor = ProcessorWithBarrierAction(num_workers=3)

    await asyncio.gather(*[
        processor.worker(i)
        for i in range(3)
    ])

asyncio.run(demonstrate_barrier_action())
Enter fullscreen mode Exit fullscreen mode

Output:

Worker 0 working on phase 1
Worker 1 working on phase 1
Worker 2 working on phase 1
Worker 0 working on phase 1
Worker 2 working on phase 1
Worker 1 working on phase 1

==================================================
BARRIER REACHED: Phase 1 complete!
Collected 3 results this phase
==================================================

Worker 1 continuing from phase 1
Worker 0 continuing from phase 1
Worker 2 continuing from phase 1
...
Enter fullscreen mode Exit fullscreen mode

"The action runs exactly once," Margaret noted, "in the context of the last task to arrive. It's perfect for logging, aggregation, or checkpoint saves."

Synchronized Start: Everyone Begins Together

"Here's another useful pattern," Margaret said. "Sometimes you want all workers to start at exactly the same moment - like runners at a starting line."

import asyncio
import time

class SynchronizedRace:
    def __init__(self, num_runners):
        # Create barrier with num_runners + 1 (coordinator)
        self.barrier = asyncio.Barrier(num_runners + 1)
        self.start_time = None

    async def runner(self, runner_id):
        print(f"Runner {runner_id} ready at starting line")

        # Wait for starting gun
        await self.barrier.wait()

        # All runners start at the same instant
        start_time = time.time()

        # Simulate race
        await asyncio.sleep(runner_id * 0.1)  # Different speeds

        finish_time = time.time()
        elapsed = finish_time - start_time
        print(f"Runner {runner_id} finished in {elapsed:.2f}s")

    async def coordinator(self):
        print("Coordinator: Setting up race...")
        await asyncio.sleep(1)  # Preparation time

        print("\nCoordinator: On your marks... get set...")
        await asyncio.sleep(0.5)

        print("GO!\n")

        # Release all runners simultaneously
        await self.barrier.wait()

async def demonstrate_synchronized_start():
    race = SynchronizedRace(num_runners=4)

    # Start all runners and coordinator
    await asyncio.gather(
        race.coordinator(),
        *[race.runner(i) for i in range(4)]
    )

asyncio.run(demonstrate_synchronized_start())
Enter fullscreen mode Exit fullscreen mode

Output:

Runner 0 ready at starting line
Runner 1 ready at starting line
Runner 2 ready at starting line
Runner 3 ready at starting line
Coordinator: Setting up race...

Coordinator: On your marks... get set...
GO!

Runner 0 finished in 0.00s
Runner 1 finished in 0.10s
Runner 2 finished in 0.20s
Runner 3 finished in 0.30s
Enter fullscreen mode Exit fullscreen mode

"Notice," Margaret pointed out, "the barrier count is 5 (4 runners + 1 coordinator). The coordinator controls when the race starts, and all runners are released simultaneously when the coordinator reaches the barrier."

Real-World Pattern: Map-Reduce

"Let's look at a classic use case," Margaret said. "Map-Reduce operations where workers process chunks independently, then all results must be combined before the next step."

import asyncio
import random

class MapReduceProcessor:
    def __init__(self, num_workers, data_chunks):
        self.num_workers = num_workers
        self.data_chunks = data_chunks
        self.map_results = []
        self.reduce_result = None

        # Two barriers: one after map, one after reduce
        self.map_barrier = asyncio.Barrier(num_workers)
        self.reduce_barrier = asyncio.Barrier(num_workers)

    async def mapper(self, worker_id):
        # Map phase: process assigned chunk
        chunk = self.data_chunks[worker_id]
        print(f"Mapper {worker_id} processing {len(chunk)} items")

        await asyncio.sleep(random.uniform(0.5, 1.5))

        # Compute sum of chunk
        chunk_sum = sum(chunk)
        self.map_results.append(chunk_sum)
        print(f"Mapper {worker_id} computed sum: {chunk_sum}")

        # Wait for all mappers to finish
        await self.map_barrier.wait()

        # Reduce phase: all workers participate in reduction
        # In real systems, maybe only one worker does this
        # Here we simulate distributed reduction
        if worker_id == 0:
            # Worker 0 does the final reduction
            total = sum(self.map_results)
            self.reduce_result = total
            print(f"\nReducer: Total sum = {total}\n")

        # Wait for reduction to complete
        await self.reduce_barrier.wait()

        # All workers can now see the final result
        print(f"Worker {worker_id} sees final result: {self.reduce_result}")

async def demonstrate_map_reduce():
    # Split data into 4 chunks
    data_chunks = [
        [1, 2, 3, 4],
        [5, 6, 7, 8],
        [9, 10, 11, 12],
        [13, 14, 15, 16]
    ]

    processor = MapReduceProcessor(
        num_workers=len(data_chunks),
        data_chunks=data_chunks
    )

    await asyncio.gather(*[
        processor.mapper(i)
        for i in range(len(data_chunks))
    ])

    print(f"\nFinal answer: {processor.reduce_result}")
    print(f"Expected: {sum(range(1, 17))}")

asyncio.run(demonstrate_map_reduce())
Enter fullscreen mode Exit fullscreen mode

Output:

Mapper 0 processing 4 items
Mapper 1 processing 4 items
Mapper 2 processing 4 items
Mapper 3 processing 4 items
Mapper 0 computed sum: 10
Mapper 2 computed sum: 42
Mapper 1 computed sum: 26
Mapper 3 computed sum: 58

Reducer: Total sum = 136

Worker 0 sees final result: 136
Worker 1 sees final result: 136
Worker 2 sees final result: 136
Worker 3 sees final result: 136

Final answer: 136
Expected: 136 ✓
Enter fullscreen mode Exit fullscreen mode

Barrier Properties and Methods

Margaret showed Timothy the key Barrier properties:

import asyncio

async def demonstrate_barrier_properties():
    barrier = asyncio.Barrier(parties=5)

    print(f"Parties required: {barrier.parties}")
    print(f"Currently waiting: {barrier.n_waiting}")
    print(f"Barrier broken: {barrier.broken}")

    # Note: parties cannot be changed after initialization
    # barrier.parties = 10  # ❌ This would raise AttributeError

    ready_event = asyncio.Event()

    async def worker(worker_id):
        print(f"Worker {worker_id} arriving (waiting: {barrier.n_waiting})")

        # Wait for signal to proceed to barrier
        await ready_event.wait()

        await barrier.wait()
        print(f"Worker {worker_id} released")

    # Start workers but keep them paused
    workers = [asyncio.create_task(worker(i)) for i in range(5)]

    # Let workers reach their first await
    await asyncio.sleep(0.1)
    print(f"\nBefore barrier: {barrier.n_waiting} workers waiting")

    # Release workers to barrier
    ready_event.set()
    await asyncio.sleep(0.1)

    print(f"At barrier: {barrier.n_waiting} workers waiting")

    await asyncio.gather(*workers)

    print(f"After release: {barrier.n_waiting} workers waiting")

asyncio.run(demonstrate_barrier_properties())
Enter fullscreen mode Exit fullscreen mode

Output:

Parties required: 5
Currently waiting: 0
Barrier broken: False
Worker 0 arriving (waiting: 0)
Worker 1 arriving (waiting: 0)
Worker 2 arriving (waiting: 0)
Worker 3 arriving (waiting: 0)
Worker 4 arriving (waiting: 0)

Before barrier: 0 workers waiting
At barrier: 5 workers waiting
Worker 0 released
Worker 1 released
Worker 2 released
Worker 3 released
Worker 4 released
After release: 0 workers waiting
Enter fullscreen mode Exit fullscreen mode

"Important detail," Margaret pointed out. "The parties count is fixed at initialization and cannot be changed. If you need a different number of tasks, you must create a new Barrier. This reinforces the concept that Barriers coordinate a fixed group size - you can't add or remove members dynamically."

Handling Barrier Failures: abort() and reset()

"What if something goes wrong?" Timothy asked. "What if a worker crashes before reaching the Barrier?"

Margaret showed him the error handling methods:

import asyncio

class ResilientProcessor:
    def __init__(self, num_workers):
        self.barrier = asyncio.Barrier(num_workers)

    async def worker_that_fails(self, worker_id):
        try:
            print(f"Worker {worker_id} starting")
            await asyncio.sleep(0.5)

            if worker_id == 2:
                raise ValueError("Worker 2 encountered an error!")

            await self.barrier.wait()
            print(f"Worker {worker_id} passed barrier")

        except Exception as e:
            print(f"Worker {worker_id} failed: {e}")
            # Abort the barrier so other workers don't wait forever
            self.barrier.abort()
            raise

    async def worker_that_waits(self, worker_id):
        try:
            print(f"Worker {worker_id} starting")
            await asyncio.sleep(0.5)
            await self.barrier.wait()
            print(f"Worker {worker_id} passed barrier")
        except asyncio.BrokenBarrierError:
            print(f"Worker {worker_id} detected broken barrier")

async def demonstrate_barrier_abort():
    processor = ResilientProcessor(num_workers=4)

    workers = [
        processor.worker_that_fails(0),
        processor.worker_that_fails(1),
        processor.worker_that_fails(2),  # This one will fail
        processor.worker_that_waits(3)
    ]

    results = await asyncio.gather(*workers, return_exceptions=True)

    print(f"\nBarrier broken: {processor.barrier.broken}")
    print(f"Results: {[type(r).__name__ for r in results]}")

asyncio.run(demonstrate_barrier_abort())
Enter fullscreen mode Exit fullscreen mode

Output:

Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 2 failed: Worker 2 encountered an error!
Worker 0 detected broken barrier
Worker 1 detected broken barrier
Worker 3 detected broken barrier

Barrier broken: True
Results: ['BrokenBarrierError', 'BrokenBarrierError', 'ValueError', 'BrokenBarrierError']
Enter fullscreen mode Exit fullscreen mode

"When you call abort()," Margaret explained, "all waiting tasks get a BrokenBarrierError, and the barrier is marked as broken. You can then call reset() to restore it for reuse:"

import asyncio

async def demonstrate_barrier_reset():
    barrier = asyncio.Barrier(2)

    # First use
    async def phase_1():
        await barrier.wait()
        print("Phase 1 complete")

    await asyncio.gather(phase_1(), phase_1())

    # Simulate a failure
    barrier.abort()
    print(f"Barrier broken: {barrier.broken}")

    # Reset and reuse
    barrier.reset()
    print(f"After reset, broken: {barrier.broken}")

    # Second use
    async def phase_2():
        await barrier.wait()
        print("Phase 2 complete")

    await asyncio.gather(phase_2(), phase_2())

asyncio.run(demonstrate_barrier_reset())
Enter fullscreen mode Exit fullscreen mode

Output:

Phase 1 complete
Phase 1 complete
Barrier broken: True
After reset, broken: False
Phase 2 complete
Phase 2 complete
Enter fullscreen mode Exit fullscreen mode

Timeout Protection: Preventing Infinite Waits

"One more critical pattern," Margaret said. "What if a worker never arrives? You don't want to wait forever."

import asyncio

class TimeoutSafeProcessor:
    def __init__(self, num_workers):
        self.barrier = asyncio.Barrier(num_workers)
        self.completed = []

    async def reliable_worker(self, worker_id):
        try:
            print(f"Worker {worker_id} starting")
            await asyncio.sleep(0.5)

            # Wait at barrier with timeout
            await asyncio.wait_for(
                self.barrier.wait(),
                timeout=2.0
            )

            self.completed.append(worker_id)
            print(f"Worker {worker_id} completed")

        except asyncio.TimeoutError:
            print(f"Worker {worker_id} timed out waiting at barrier")
            self.barrier.abort()  # Don't leave others waiting
        except asyncio.BrokenBarrierError:
            print(f"Worker {worker_id} detected broken barrier")

    async def slow_worker(self, worker_id):
        """This worker takes too long to reach the barrier"""
        print(f"Slow worker {worker_id} starting (will be late)")
        await asyncio.sleep(5)  # Takes too long

        try:
            await self.barrier.wait()
        except asyncio.BrokenBarrierError:
            print(f"Slow worker {worker_id} arrived too late, barrier broken")

async def demonstrate_timeout_protection():
    processor = TimeoutSafeProcessor(num_workers=4)

    # 3 fast workers, 1 slow worker
    await asyncio.gather(
        processor.reliable_worker(0),
        processor.reliable_worker(1),
        processor.reliable_worker(2),
        processor.slow_worker(3),  # This one is too slow
        return_exceptions=True
    )

    print(f"\nCompleted workers: {processor.completed}")
    print(f"Barrier broken: {processor.barrier.broken}")

asyncio.run(demonstrate_timeout_protection())
Enter fullscreen mode Exit fullscreen mode

Output:

Worker 0 starting
Worker 1 starting
Worker 2 starting
Slow worker 3 starting (will be late)
Worker 0 timed out waiting at barrier
Worker 1 detected broken barrier
Worker 2 detected broken barrier
Slow worker 3 arrived too late, barrier broken

Completed workers: []
Barrier broken: True
Enter fullscreen mode Exit fullscreen mode

"Always use timeouts in production," Margaret emphasized. "If a worker crashes or hangs, you don't want the entire system stuck forever. The first worker to timeout should abort the barrier to release the others."

Testing Barriers

Margaret showed Timothy how to test Barrier-based code:

import asyncio
import pytest

class SynchronizedSystem:
    def __init__(self, num_workers):
        self.barrier = asyncio.Barrier(num_workers)
        self.phase_1_complete = []
        self.phase_2_complete = []

    async def worker(self, worker_id):
        # Phase 1
        await asyncio.sleep(0.1)
        self.phase_1_complete.append(worker_id)

        await self.barrier.wait()

        # Phase 2 - should only start after ALL phase 1 complete
        assert len(self.phase_1_complete) == self.barrier.parties
        self.phase_2_complete.append(worker_id)

@pytest.mark.asyncio
async def test_barrier_synchronization():
    system = SynchronizedSystem(num_workers=3)

    workers = [system.worker(i) for i in range(3)]
    await asyncio.gather(*workers)

    # Verify all workers completed both phases
    assert len(system.phase_1_complete) == 3
    assert len(system.phase_2_complete) == 3

@pytest.mark.asyncio
async def test_barrier_abort():
    barrier = asyncio.Barrier(3)

    async def failing_worker():
        barrier.abort()

    async def waiting_worker():
        with pytest.raises(asyncio.BrokenBarrierError):
            await barrier.wait()

    await asyncio.gather(
        failing_worker(),
        waiting_worker(),
        waiting_worker()
    )

    assert barrier.broken is True

@pytest.mark.asyncio
async def test_barrier_properties():
    barrier = asyncio.Barrier(5)

    assert barrier.parties == 5
    assert barrier.n_waiting == 0
    assert barrier.broken is False

    async def wait_at_barrier():
        await asyncio.sleep(0.1)
        await barrier.wait()

    # Start 4 workers (not enough to release)
    tasks = [asyncio.create_task(wait_at_barrier()) for _ in range(4)]

    await asyncio.sleep(0.2)
    assert barrier.n_waiting == 4

    # Start 5th worker to release all
    tasks.append(asyncio.create_task(wait_at_barrier()))
    await asyncio.gather(*tasks)

    assert barrier.n_waiting == 0

# Run tests with: pytest test_barriers.py -v
Enter fullscreen mode Exit fullscreen mode

Common Patterns and Anti-Patterns

Margaret created a reference guide:

"""
✅ GOOD PATTERNS:

1. Multi-phase processing
   barrier = asyncio.Barrier(n)
   for phase in phases:
       do_work()
       await barrier.wait()

2. Synchronized start (all tasks begin together)
   barrier = asyncio.Barrier(n + 1)  # +1 for coordinator
   # In workers:
   await barrier.wait()  # Wait for coordinator
   do_work()
   # In coordinator:
   setup()
   await barrier.wait()  # Release all workers

3. Action for logging/checkpoints
   barrier = asyncio.Barrier(n, action=save_checkpoint)

❌ ANTI-PATTERNS:

1. Wrong party count
   barrier = asyncio.Barrier(4)
   # Only start 3 workers - deadlock forever!

2. Nested barriers (potential deadlock)
   await barrier1.wait()
   await barrier2.wait()  # If workers hit these out of order

3. Not handling abort in error cases
   # Worker fails but doesn't call barrier.abort()
   # Other workers wait forever

4. Forgetting barrier is reusable
   # Creating new barrier for each phase instead of reusing
"""
Enter fullscreen mode Exit fullscreen mode

When to Use Barriers

Timothy pulled out his decision tree. "When should I use a Barrier versus other primitives?"

Margaret expanded it:

"""
Barrier vs Other Primitives:

Use Barrier when:
✓ N tasks must ALL complete a phase before ANY continues
✓ Multiple synchronization points in workflow
✓ Map-reduce patterns
✓ Simulation steps that must advance together
✓ Coordinated starts (everyone begins simultaneously)

Don't use Barrier when:
✗ Tasks are independent (no synchronization needed)
✗ Only one task needs to wait (use Event)
✗ Producer-consumer pattern (use Queue)
✗ Mutual exclusion only (use Lock)
✗ Complex waiting conditions (use Condition)

Barrier = "Group checkpoint where everyone must meet"
"""
Enter fullscreen mode Exit fullscreen mode

Timothy studied the patterns carefully. "So Barriers are perfect when you have a fixed number of tasks that must move through phases together - like runners at a starting line, or workers in lockstep computation."

"Exactly," Margaret said. "The key insight: Barriers coordinate groups at checkpoints. Master this pattern, and you'll handle complex multi-stage async workflows with confidence."

Key Takeaways

  1. Barriers synchronize groups - N tasks must all arrive before any proceeds
  2. Barriers are reusable - Automatically reset after all tasks pass through
  3. Use action callbacks for checkpoint logic when barrier fills
  4. Always call abort() if a worker fails, to prevent deadlock
  5. Party count must match - Create exactly N workers for a Barrier(N)
  6. reset() restores a broken barrier for reuse
  7. Test party count and synchronization behavior explicitly

With Barriers in his toolkit, Timothy could now coordinate complex multi-phase async operations where groups of tasks must advance together through stages - the perfect primitive for lockstep computation.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Top comments (0)