DEV Community

Chandrashekhar Kachawa
Chandrashekhar Kachawa

Posted on • Originally published at ctrix.pro

Controlling Concurrency in Python: Semaphores and Pool Workers

Running thousands of tasks concurrently is powerful, but it can also be dangerous. If you make 1,000 simultaneous requests to a web API, you might get rate-limited, blocked, or even crash the service. True robustness comes not just from running tasks in parallel, but from controlling how many run at once.

This guide covers the two primary ways to limit concurrency in Python for different models: asyncio.Semaphore for the asynchronous world, and max_workers for thread and process pools.

1. Limiting Concurrent Coroutines with asyncio.Semaphore

What is it? An asyncio.Semaphore is a synchronization primitive that controls access to a shared resource. You can think of it as a bouncer at a club with a strict capacity. It's initialized with a number (the capacity), and coroutines must "acquire" a permit from it to proceed. If no permits are available, the coroutine waits until another one "releases" its permit.

When to use it: When you are in an asyncio application and need to limit access to a resource. The classic example is controlling access to a rate-limited API.

Syntax Example: Let's simulate making 20 API calls to a service that only allows 5 concurrent connections.

import asyncio
import time

async def api_worker(name: str, semaphore: asyncio.Semaphore):
    """A worker that simulates a rate-limited API call."""
    # The `async with` statement handles acquiring and releasing the semaphore
    async with semaphore:
        print(f"({time.strftime('%X')}) Worker {name}: Acquired permit. Calling API...")
        # Simulate a 2-second network request
        await asyncio.sleep(2)
        print(f"({time.strftime('%X')}) Worker {name}: Finished call. Releasing permit.")
    return f"Result from {name}"

async def main():
    # Create a semaphore that allows only 5 concurrent tasks
    semaphore = asyncio.Semaphore(5)

    # Create 20 tasks that all need to use the semaphore
    tasks = [api_worker(f"Task-{i}", semaphore) for i in range(20)]

    print("Starting all tasks...")
    await asyncio.gather(*tasks)
    print("All tasks complete.")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

If you run this code, you will see that only 5 workers ever print "Acquired permit" at a time. The semaphore effectively throttles your coroutines to respect the resource's limit.

2. Limiting Parallelism in Thread and Process Pools

What is it? For the concurrent.futures framework, the mechanism for limiting parallelism is simpler and more static. It's built directly into the ThreadPoolExecutor and ProcessPoolExecutor themselves via the max_workers parameter.

When to use it: When you want to set a fixed, global limit on the number of threads or processes that can run in parallel for the entire lifetime of the pool.

Syntax Example: Let's run 10 CPU-bound jobs but limit our ProcessPoolExecutor to only use 2 processes at a time.

from concurrent.futures import ProcessPoolExecutor
import time
import os

def heavy_calculation(num: int):
    """A silly, CPU-intensive task."""
    print(f"  [PID {os.getpid()}] Starting calculation for {num}...")
    time.sleep(1) # Simulate work
    print(f"  [PID {os.getpid()}] Finished calculation for {num}.")
    return num * num

if __name__ == "__main__":
    print("Starting a process pool with a max of 2 workers.")
    # Limit the pool to only 2 concurrent processes
    with ProcessPoolExecutor(max_workers=2) as executor:
        numbers = range(10)
        # The executor will only run 2 of these tasks at any given moment.
        # It manages the queue of the other 8 pending tasks automatically.
        results = executor.map(heavy_calculation, numbers)

    print("All calculations complete.")
Enter fullscreen mode Exit fullscreen mode

When you run this, you'll see from the process IDs (PIDs) and start/finish messages that only two calculations are ever running at the same time. The executor handles queuing the remaining tasks for you.

Conclusion

Uncontrolled concurrency can be as bad as no concurrency at all. By using tools like asyncio.Semaphore and the max_workers parameter, you can build robust, efficient, and well-behaved applications that respect the limits of the resources they depend on.

  • Use asyncio.Semaphore when you need to throttle coroutines accessing a shared resource.
  • Use max_workers when you need to limit the total number of parallel threads or processes in a pool.

Top comments (0)