DEV Community

Cover image for The Lock: Synchronization Primitives for Shared State
Aaron Rose
Aaron Rose

Posted on

The Lock: Synchronization Primitives for Shared State

Timothy was excited about his new caching system. The library's book lookup was slow, so he'd built a cache that multiple async tasks could update. But when he ran it, something was wrong.

"Margaret, look at this," Timothy said, pointing at his screen. "My cache statistics are completely off. I'm tracking cache hits and misses, but the numbers don't add up. Sometimes they're lower than they should be, and the cache size is wrong too."

import asyncio
import random

# Shared cache state
book_cache = {}
cache_stats = {"hits": 0, "misses": 0, "size": 0}

async def fetch_book(book_id):
    """Fetch book from cache or database"""
    # Check cache
    if book_id in book_cache:
        # PROBLEM: Multiple tasks can reach here simultaneously!
        cache_stats["hits"] += 1
        return book_cache[book_id]

    # Cache miss - fetch from "database"
    await asyncio.sleep(0.1)  # Simulate database query
    book_data = {"id": book_id, "title": f"Book {book_id}"}

    # Update cache
    book_cache[book_id] = book_data
    cache_stats["misses"] += 1
    cache_stats["size"] = len(book_cache)

    return book_data

async def broken_cache_demo():
    """Multiple tasks accessing shared cache"""
    print("Fetching 10 books with 5 concurrent workers...")

    # Request same books multiple times
    book_requests = [1, 2, 3, 1, 2, 3, 1, 2, 3, 4]

    tasks = [fetch_book(book_id) for book_id in book_requests]
    await asyncio.gather(*tasks)

    print(f"\nCache stats:")
    print(f"  Hits: {cache_stats['hits']}")
    print(f"  Misses: {cache_stats['misses']}")
    print(f"  Cache size: {cache_stats['size']}")
    print(f"  Actual cache size: {len(book_cache)}")
    print(f"  Expected: 7 hits, 3 misses")

asyncio.run(broken_cache_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Fetching 10 books with 5 concurrent workers...

Cache stats:
  Hits: 5
  Misses: 4
  Cache size: 3
  Actual cache size: 4
  Expected: 7 hits, 3 misses
Enter fullscreen mode Exit fullscreen mode

Margaret examined the output. "You have race conditions. Multiple tasks are reading and writing your shared state simultaneously. The += operation isn't atomic—one task reads the value, another task reads the same value, they both increment, and one update gets lost."

"But I thought async code only had one task running at a time?" Timothy said.

"Within a single thread, yes," Margaret explained. "But when you await, you give up control. Another task can run and modify the same variables. You need synchronization primitives to protect shared state."

Understanding asyncio.Lock

"The most fundamental primitive is the Lock," Margaret said. "It ensures only one task accesses a critical section at a time."

import asyncio

# Shared state
counter = 0
lock = asyncio.Lock()

async def increment_without_lock():
    """Unsafe increment - race condition"""
    global counter
    current = counter
    await asyncio.sleep(0)  # Simulate some async work
    counter = current + 1

async def increment_with_lock():
    """Safe increment - protected by lock"""
    global counter
    async with lock:
        current = counter
        await asyncio.sleep(0)  # Other tasks blocked here
        counter = current + 1

async def demo_lock():
    """Compare locked vs unlocked operations"""
    global counter

    # Without lock
    counter = 0
    print("Without lock:")
    await asyncio.gather(*[increment_without_lock() for _ in range(10)])
    print(f"  Counter: {counter} (expected: 10)\n")

    # With lock
    counter = 0
    print("With lock:")
    await asyncio.gather(*[increment_with_lock() for _ in range(10)])
    print(f"  Counter: {counter} (expected: 10)")

asyncio.run(demo_lock())
Enter fullscreen mode Exit fullscreen mode

Output:

Without lock:
  Counter: 1 (expected: 10, result varies each run!)

With lock:
  Counter: 10 (expected: 10)
Enter fullscreen mode Exit fullscreen mode

"See the difference?" Margaret said. "The lock ensures that once a task enters the async with lock: block, no other task can enter until the first one exits."

"One critical warning," Margaret added. "Locks are not reentrant. If a task tries to acquire the same lock twice, it will deadlock itself."

import asyncio

lock = asyncio.Lock()

async def will_deadlock():
    """This will hang forever!"""
    async with lock:
        print("Acquired lock once")
        async with lock:  # Trying to acquire same lock again - DEADLOCK!
            print("This never prints")

# Don't run this - it will hang!
# asyncio.run(will_deadlock())
Enter fullscreen mode Exit fullscreen mode

"If you need reentrant locks, you must track ownership manually," Margaret said. "But usually, restructure your code instead."

Fixing the Cache with Lock

Margaret refactored Timothy's cache:

import asyncio
import random

class BookCache:
    """Thread-safe book cache"""

    def __init__(self):
        self.cache = {}
        self.stats = {"hits": 0, "misses": 0}
        self.lock = asyncio.Lock()

    async def get(self, book_id):
        """Get book from cache or fetch from database"""
        async with self.lock:
            # Check cache
            if book_id in self.cache:
                self.stats["hits"] += 1
                return self.cache[book_id]

            # Cache miss
            self.stats["misses"] += 1

        # Fetch from database (outside lock - don't hold lock during I/O!)
        await asyncio.sleep(0.1)
        book_data = {"id": book_id, "title": f"Book {book_id}"}

        # Update cache
        async with self.lock:
            self.cache[book_id] = book_data

        return book_data

    def get_stats(self):
        """Get current statistics"""
        return {
            "hits": self.stats["hits"],
            "misses": self.stats["misses"],
            "size": len(self.cache)
        }

async def fixed_cache_demo():
    """Demonstrate thread-safe cache"""
    cache = BookCache()

    print("Fetching 10 books with 5 concurrent workers...")

    # Request same books multiple times
    book_requests = [1, 2, 3, 1, 2, 3, 1, 2, 3, 4]

    tasks = [cache.get(book_id) for book_id in book_requests]
    await asyncio.gather(*tasks)

    stats = cache.get_stats()
    print(f"\nCache stats:")
    print(f"  Hits: {stats['hits']}")
    print(f"  Misses: {stats['misses']}")
    print(f"  Cache size: {stats['size']}")
    print(f"  Result: Correct!")

asyncio.run(fixed_cache_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Fetching 10 books with 5 concurrent workers...

Cache stats:
  Hits: 6
  Misses: 4
  Cache size: 4
  Result: Correct!
Enter fullscreen mode Exit fullscreen mode

"Notice I acquire the lock twice," Margaret explained. "Once to check the cache, then release it during the slow database fetch, then acquire it again to update the cache. Never hold a lock during I/O operations—you'll block all other tasks."

Manual Lock Acquisition

"Sometimes you can't use async with," Margaret said. "Here's the manual approach."

import asyncio

lock = asyncio.Lock()
counter = 0

async def manual_lock_usage():
    """Manually acquire and release lock"""
    global counter

    await lock.acquire()
    try:
        # Critical section
        counter += 1
        await asyncio.sleep(0.1)
    finally:
        lock.release()

async def demo_manual():
    """Using lock manually"""
    global counter
    counter = 0

    await asyncio.gather(*[manual_lock_usage() for _ in range(5)])
    print(f"Counter: {counter}")

asyncio.run(demo_manual())
Enter fullscreen mode Exit fullscreen mode

Output:

Counter: 5
Enter fullscreen mode Exit fullscreen mode

"Always use try/finally to ensure the lock gets released," Margaret emphasized. "But prefer async with when possible—it's safer."

Semaphore: Limiting Concurrent Access

"What if you want to allow some concurrency, but not unlimited?" Timothy asked. "Like limiting database connections?"

"That's what Semaphore is for," Margaret said. "It's like a lock that allows N tasks through at once."

import asyncio
import random

class DatabasePool:
    """Simulated database with connection limit"""

    def __init__(self, max_connections=3):
        self.semaphore = asyncio.Semaphore(max_connections)
        self.active_connections = 0
        self.max_active = 0

    async def query(self, query_id):
        """Execute a database query"""
        async with self.semaphore:
            self.active_connections += 1
            self.max_active = max(self.max_active, self.active_connections)

            print(f"  Query {query_id} executing (active: {self.active_connections})")
            await asyncio.sleep(random.uniform(0.5, 1.0))
            print(f"  Query {query_id} completed")

            self.active_connections -= 1

async def semaphore_demo():
    """Demonstrate semaphore limiting concurrency"""
    db = DatabasePool(max_connections=3)

    print("Starting 10 database queries (max 3 concurrent)...\n")

    tasks = [db.query(i) for i in range(10)]
    await asyncio.gather(*tasks)

    print(f"\nMax concurrent connections: {db.max_active}")

asyncio.run(semaphore_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting 10 database queries (max 3 concurrent)...

  Query 0 executing (active: 1)
  Query 1 executing (active: 2)
  Query 2 executing (active: 3)
  Query 0 completed
  Query 3 executing (active: 3)
  Query 1 completed
  Query 4 executing (active: 3)
  Query 2 completed
  Query 5 executing (active: 3)
  Query 3 completed
  Query 6 executing (active: 3)
...

Max concurrent connections: 3
Enter fullscreen mode Exit fullscreen mode

"The semaphore ensures never more than 3 queries run simultaneously," Margaret explained. "Perfect for connection pools, rate limiting, or any limited resource."

Connection Pool with Semaphore

"Here's a practical use case," Margaret said. "Limiting concurrent connections to an external service."

import asyncio

class ConnectionPool:
    """Connection pool with limited size"""

    def __init__(self, max_connections):
        self.semaphore = asyncio.Semaphore(max_connections)
        self.active = 0
        self.total_requests = 0

    async def request(self, request_id):
        """Make a request using the connection pool"""
        self.total_requests += 1

        async with self.semaphore:
            self.active += 1
            print(f"  Request {request_id}: Processing (active: {self.active})")
            await asyncio.sleep(0.5)  # Simulate request
            self.active -= 1

async def connection_pool_demo():
    """Demonstrate connection pooling"""
    pool = ConnectionPool(max_connections=3)

    print("Making 10 requests (max 3 concurrent connections)...\n")

    tasks = [pool.request(i) for i in range(10)]
    await asyncio.gather(*tasks)

    print(f"\nTotal requests: {pool.total_requests}")

asyncio.run(connection_pool_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Making 10 requests (max 3 concurrent connections)...

  Request 0: Processing (active: 1)
  Request 1: Processing (active: 2)
  Request 2: Processing (active: 3)
  Request 3: Processing (active: 3)
  Request 4: Processing (active: 3)
  Request 5: Processing (active: 3)
  Request 6: Processing (active: 3)
  Request 7: Processing (active: 3)
  Request 8: Processing (active: 3)
  Request 9: Processing (active: 3)

Total requests: 10
Enter fullscreen mode Exit fullscreen mode

"The semaphore ensures never more than 3 concurrent connections," Margaret explained. "Perfect for connection pools or any limited resource."

BoundedSemaphore: Preventing Over-Release

"One warning about regular Semaphore," Margaret added. "You can accidentally release more than you acquired."

import asyncio

async def semaphore_problem():
    """Regular semaphore allows over-release"""
    sem = asyncio.Semaphore(2)

    print("Initial semaphore value: 2")

    # Accidentally release without acquiring
    sem.release()
    sem.release()

    print("After two releases: 4 (whoops!)")

    # Now 4 tasks can acquire it
    for i in range(4):
        sem.acquire()
        print(f"  Task {i} acquired")

async def bounded_semaphore_solution():
    """BoundedSemaphore prevents over-release"""
    sem = asyncio.BoundedSemaphore(2)

    print("\nWith BoundedSemaphore:")

    try:
        sem.release()  # This will raise ValueError!
    except ValueError as e:
        print(f"  Error: {e}")

asyncio.run(semaphore_problem())
asyncio.run(bounded_semaphore_solution())
Enter fullscreen mode Exit fullscreen mode

Output:

Initial semaphore value: 2
After two releases: 4 (whoops!)
  Task 0 acquired
  Task 1 acquired
  Task 2 acquired
  Task 3 acquired

With BoundedSemaphore:
  Error: BoundedSemaphore released too many times
Enter fullscreen mode Exit fullscreen mode

"Use BoundedSemaphore when you want safety," Margaret advised.

Event: Signaling Between Tasks

"Sometimes you need to signal between tasks," Margaret said. "Like waiting for initialization to complete before processing requests."

import asyncio

async def initializer(ready_event):
    """Initialize system"""
    print("Initializer: Starting system initialization...")
    await asyncio.sleep(2)  # Simulate slow startup
    print("Initializer: System ready!")
    ready_event.set()  # Signal that we're ready

async def worker(worker_id, ready_event):
    """Worker that waits for initialization"""
    print(f"Worker {worker_id}: Waiting for system to be ready...")
    await ready_event.wait()  # Block until event is set
    print(f"Worker {worker_id}: Processing requests!")
    await asyncio.sleep(0.5)

async def event_demo():
    """Demonstrate event signaling"""
    ready_event = asyncio.Event()

    # Start workers and initializer
    tasks = [
        asyncio.create_task(initializer(ready_event)),
        asyncio.create_task(worker(1, ready_event)),
        asyncio.create_task(worker(2, ready_event)),
        asyncio.create_task(worker(3, ready_event)),
    ]

    await asyncio.gather(*tasks)
    print("\nAll workers completed!")

asyncio.run(event_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Initializer: Starting system initialization...
Worker 1: Waiting for system to be ready...
Worker 2: Waiting for system to be ready...
Worker 3: Waiting for system to be ready...
Initializer: System ready!
Worker 1: Processing requests!
Worker 2: Processing requests!
Worker 3: Processing requests!

All workers completed!
Enter fullscreen mode Exit fullscreen mode

"The Event has two states: set or not set," Margaret explained. "Tasks calling wait() block until the event is set with set(). Then all waiting tasks wake up simultaneously."

Event for Shutdown Coordination

"Events are perfect for shutdown signals," Margaret demonstrated.

import asyncio

async def background_worker(worker_id, shutdown_event):
    """Worker that processes until shutdown"""
    print(f"Worker {worker_id}: Started")

    while not shutdown_event.is_set():
        print(f"Worker {worker_id}: Processing...")
        try:
            await asyncio.wait_for(shutdown_event.wait(), timeout=1.0)
        except asyncio.TimeoutError:
            pass  # Continue working

    print(f"Worker {worker_id}: Shutting down")

async def shutdown_demo():
    """Demonstrate graceful shutdown with event"""
    shutdown_event = asyncio.Event()

    # Start background workers
    workers = [
        asyncio.create_task(background_worker(i, shutdown_event))
        for i in range(3)
    ]

    # Let them run for a bit
    await asyncio.sleep(3)

    # Signal shutdown
    print("\nMain: Sending shutdown signal...")
    shutdown_event.set()

    # Wait for workers to finish
    await asyncio.gather(*workers)
    print("Main: All workers stopped")

asyncio.run(shutdown_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Worker 0: Started
Worker 1: Started
Worker 2: Started
Worker 0: Processing...
Worker 1: Processing...
Worker 2: Processing...
Worker 0: Processing...
Worker 1: Processing...
Worker 2: Processing...

Main: Sending shutdown signal...
Worker 0: Shutting down
Worker 1: Shutting down
Worker 2: Shutting down
Main: All workers stopped
Enter fullscreen mode Exit fullscreen mode

Combining Lock and Event

"You can combine primitives for complex coordination," Margaret said.

import asyncio

class Cache:
    """Cache with refresh coordination"""

    def __init__(self):
        self.data = {}
        self.lock = asyncio.Lock()
        self.refreshing = asyncio.Event()
        self.refreshing.set()  # Start as ready

    async def get(self, key):
        """Get value from cache"""
        # Wait if refresh is in progress
        await self.refreshing.wait()

        async with self.lock:
            return self.data.get(key)

    async def refresh(self):
        """Refresh entire cache"""
        print("  Refresh: Starting...")
        self.refreshing.clear()  # Block all gets

        # Simulate fetching new data
        await asyncio.sleep(1)

        async with self.lock:
            self.data = {"book_1": "Data_1", "book_2": "Data_2"}

        self.refreshing.set()  # Unblock gets
        print("  Refresh: Complete")

async def reader(cache, reader_id):
    """Task that reads from cache"""
    await asyncio.sleep(reader_id * 0.1)  # Stagger starts
    print(f"Reader {reader_id}: Requesting data...")
    data = await cache.get("book_1")
    print(f"Reader {reader_id}: Got {data}")

async def coordination_demo():
    """Demonstrate lock + event coordination"""
    cache = Cache()

    # Start readers and refresh
    tasks = [
        asyncio.create_task(reader(cache, 1)),
        asyncio.create_task(reader(cache, 2)),
        asyncio.create_task(cache.refresh()),
        asyncio.create_task(reader(cache, 3)),
        asyncio.create_task(reader(cache, 4)),
    ]

    await asyncio.gather(*tasks)

asyncio.run(coordination_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Reader 1: Requesting data...
Reader 2: Requesting data...
  Refresh: Starting...
Reader 3: Requesting data...
Reader 4: Requesting data...
  Refresh: Complete
Reader 1: Got Data_1
Reader 2: Got Data_1
Reader 3: Got Data_1
Reader 4: Got Data_1
Enter fullscreen mode Exit fullscreen mode

"Readers block during refresh, then all proceed once the refresh completes," Margaret explained.

Lock vs Semaphore vs Event

Margaret drew a comparison on the whiteboard:

When to Use Each:

Lock (asyncio.Lock)

  • Protect shared state from concurrent modification
  • Ensure only one task in critical section
  • Example: Updating cache statistics

Semaphore (asyncio.Semaphore)

  • Limit concurrent access to resource
  • Allow N tasks through at once
  • Example: Connection pools, rate limiting

BoundedSemaphore

  • Same as Semaphore but prevents over-release
  • Use when you need safety guarantees

Event (asyncio.Event)

  • Signal between tasks
  • Wake multiple waiting tasks
  • Example: Initialization complete, shutdown signal

Combining Primitives

  • Lock + Event for complex coordination
  • Semaphore + Lock for resource pools with stats
  • Build higher-level synchronization from primitives

Common Patterns

"Let me show you some real-world patterns," Margaret said.

Pattern 1: Lazy Initialization with Lock

import asyncio

class ServiceClient:
    """Client with lazy connection initialization"""

    def __init__(self):
        self.connection = None
        self.lock = asyncio.Lock()

    async def _ensure_connected(self):
        """Ensure connection exists (double-checked locking)"""
        if self.connection is None:
            async with self.lock:
                # Check again inside lock
                if self.connection is None:
                    print("  Initializing connection...")
                    await asyncio.sleep(0.5)  # Simulate connection
                    self.connection = "Connected"

    async def request(self, data):
        """Make a request"""
        await self._ensure_connected()
        print(f"  Request: {data}")

async def lazy_init_demo():
    """Demonstrate lazy initialization"""
    client = ServiceClient()

    # Multiple concurrent requests
    tasks = [client.request(f"data_{i}") for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(lazy_init_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  Initializing connection...
  Request: data_0
  Request: data_1
  Request: data_2
  Request: data_3
  Request: data_4
Enter fullscreen mode Exit fullscreen mode

Pattern 2: Resource Pool

import asyncio

class ResourcePool:
    """Pool of reusable resources"""

    def __init__(self, size):
        self.semaphore = asyncio.Semaphore(size)
        self.lock = asyncio.Lock()
        self.allocated = 0
        self.peak = 0

    async def acquire(self):
        """Acquire resource from pool"""
        await self.semaphore.acquire()

        async with self.lock:
            self.allocated += 1
            self.peak = max(self.peak, self.allocated)

    def release(self):
        """Release resource back to pool"""
        self.semaphore.release()
        self.allocated -= 1

    async def use(self, task_id):
        """Use a resource from the pool"""
        await self.acquire()
        try:
            print(f"  Task {task_id}: Using resource")
            await asyncio.sleep(0.5)
        finally:
            self.release()

async def pool_demo():
    """Demonstrate resource pool"""
    pool = ResourcePool(size=3)

    tasks = [pool.use(i) for i in range(10)]
    await asyncio.gather(*tasks)

    print(f"\nPeak allocation: {pool.peak}")

asyncio.run(pool_demo())
Enter fullscreen mode Exit fullscreen mode

Avoiding Deadlocks

"One more critical pattern," Margaret said. "When you need multiple locks, always acquire them in the same order."

import asyncio

lock_a = asyncio.Lock()
lock_b = asyncio.Lock()

async def transfer_books(from_shelf, to_shelf, book):
    """Good: Always acquire locks in consistent order"""
    locks = sorted([from_shelf.lock, to_shelf.lock], key=id)

    async with locks[0]:
        async with locks[1]:
            # Safe to modify both shelves
            from_shelf.books.remove(book)
            to_shelf.books.append(book)

async def bad_transfer(shelf1, shelf2, book):
    """Bad: Lock order depends on argument order - can deadlock!"""
    async with shelf1.lock:
        await asyncio.sleep(0)  # Other task might run here
        async with shelf2.lock:  # Might wait forever if other task has opposite order
            pass
Enter fullscreen mode Exit fullscreen mode

"If Task A holds Lock 1 and waits for Lock 2, while Task B holds Lock 2 and waits for Lock 1, both deadlock," Margaret explained. "Consistent ordering prevents this."

The Takeaway

Timothy closed his laptop, his cache now working correctly with proper synchronization.

Key insights:

Use asyncio.Lock() to protect shared state from concurrent modification

Only one task can hold a lock at a time

Race conditions without locks produce non-deterministic results that vary each run

Use async with lock: for automatic acquire/release

Never hold a lock during I/O operations—release it first

Locks are NOT reentrant—a task cannot acquire the same lock twice without deadlocking itself

When using multiple locks, always acquire them in the same order to avoid deadlocks

Use asyncio.Semaphore(N) to limit concurrent access to N tasks

Semaphores are perfect for connection pools and rate limiting

BoundedSemaphore prevents accidentally releasing more than you acquired

Use asyncio.Event() to signal between tasks

event.set() wakes all waiting tasks simultaneously

event.clear() blocks future waits until set() is called again

Check event.is_set() to test state without blocking

Combine primitives for complex coordination (Lock + Event)

Lock protects data, Event coordinates timing

Always use try/finally when manually acquiring locks

Double-checked locking pattern: check, lock, check again

Resource pools combine Semaphore with Lock for statistics

Prefer async with over manual acquire/release when possible

Shared-State Synchronization

Margaret and Timothy had moved from message-passing (queues) to shared-state synchronization. The primitives were simple—Lock, Semaphore, Event—but combined, they could coordinate even complex async systems. Timothy's cache was now thread-safe, his database connections were properly limited, and his workers could coordinate startup and shutdown cleanly.

As Timothy reviewed the code, he realized that while queues were better for most async coordination, sometimes you genuinely needed shared state. When you did, these primitives ensured that state stayed consistent even when multiple tasks accessed it concurrently.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Top comments (0)