Introduction
When working with concurrent programs, it is crucial to prevent race conditions. In this article, we will demonstrate the implementation of a scalable and efficient mutex to facilitate the management of multiple threads.
As will be presented, it was able to reduce the time up to 90.02%
when implementing the mutex instead of simple lock solution.
Source code can be found at: GitHub
PS.: Python version used: 3.10
PS.: Notebook configs used in these tests: Intel i7-1165G7 2.80 GHz | 16 Gb RAM
1. Study Case đź“ť
By exploring a race condition problem, we will develop a mutex to address the problem efficiently.
1.1. Race Condition Example
We have multiple objects to count, a function to increase the counter and multiple threads.
1.1.1. Counters
from dataclasses import dataclass
@dataclass
class Counter:
value: int
name: str
first_counter = Counter(value=0, name="first_counter")
second_counter = Counter(value=0, name="second_counter")
third_counter = Counter(value=0, name="third_counter")
fourth_counter = Counter(value=0, name="fourth_counter")
fifth_counter = Counter(value=0, name="fifth_counter")
sixth_counter = Counter(value=0, name="sixth_counter")
1.1.2. Increase Counter Function
from time import sleep
def increase(by: int, counter: Counter):
local_counter = counter.value
local_counter += by
sleep(0.1) # processing time simulation
counter.value = local_counter
1.1.3. Execution
import threading
from time import time
if __name__ == "__main__":
threads = [
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,fourth_counter)),
threading.Thread(target=increase, args=(1,sixth_counter)),
threading.Thread(target=increase, args=(1,third_counter)),
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,fifth_counter)),
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,second_counter)),
threading.Thread(target=increase, args=(1,fourth_counter)),
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,second_counter)),
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,fourth_counter)),
threading.Thread(target=increase, args=(1,first_counter)),
threading.Thread(target=increase, args=(1,sixth_counter)),
threading.Thread(target=increase, args=(1,third_counter)),
threading.Thread(target=increase, args=(1,third_counter)),
threading.Thread(target=increase, args=(1,fifth_counter)),
threading.Thread(target=increase, args=(1,third_counter)),
threading.Thread(target=increase, args=(1,sixth_counter)),
threading.Thread(target=increase, args=(1,third_counter)),
]
initial_time = time()
for t in threads:
t.start()
for t in threads:
t.join()
final_time = time()
print(f"\nFinal value {first_counter.name}:", first_counter.value)
print(f"Final value {second_counter.name}:", second_counter.value)
print(f"Final value {third_counter.name}:", third_counter.value)
print(f"Final value {fourth_counter.name}:", fourth_counter.value)
print(f"Final value {fifth_counter.name}:", fifth_counter.value)
print(f"Final value {sixth_counter.name}:", sixth_counter.value)
print(f"\nExecution time: {round(final_time - initial_time, 3)} seconds")
1.1.4. Execution Results
Final value first_counter: 1
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1
Execution time: 0.102 seconds
It wasn't what we expected. Although multiple threads were modifying the counters, they all have the same value: 1. This is an example of race conditions. But how can we solve this?
1.2. Solving the Problem
What if we could lock the counter so each thread increase the counter at a time? So let's insert a lock in this function so that any thread has to wait for its turn to increase the counter.
We can do that by implementing a Lock in the increase
function:
import threading
from time import sleep
lock = threading.Lock()
def increase(by: int, counter: Counter):
with lock:
local_counter = counter.value
local_counter += by
sleep(0.1) # processing time simulation
counter.value = local_counter
Whenever a thread enters the context manager it locks for the rest. The lock is only released when thread exits the context manager.
1.2.1. Results After Simple Lock
Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3
Execution time: 2.108 seconds
As we can see, it has taken a lot longer to finish execution after inserting the lock. How can we improve it? That is what our mutex is going to do.
2. Our Goal
The mutex has to check some requirements:
- Be efficient
- Be easy to maintain
- Be scalable - see Open–closed principle
For the sake of efficiency, our mutex will have multiple locks each associated with a specific key; Once the thread provides the key, the mutex will create a lock associated with the key or enqueues the predefined key if the lock already exists. Since our mutex has to be available to the entire system, the mutex needs to be a thread safe Singleton.
3. Mutex 🚦
Now let's implement what we have defined.
3.1. Thread Safe Singleton
import threading
class SingletonThreadSafeMeta(type):
"""Thread safe Singleton"""
__instances = {}
__lock = threading.Lock()
def __call__(cls, *args, **kwargs):
with cls.__lock:
if cls not in cls.__instances:
instance = super().__call__(*args, **kwargs)
cls.__instances[cls] = instance
return cls.__instances[cls]
3.2. Key Based Mutex
import threading
from singleton_thread_safe import SingletonThreadSafeMeta
class CustomMutex(metaclass=SingletonThreadSafeMeta):
def __init__(self):
self.__lockers: dict[str, threading.Lock] = {}
self.__lockers_counter: dict[str, int] = {}
def __decrease_counter(self, key: str) -> None:
self.__lockers_counter[key] -= 1
if self.__lockers_counter == 0:
del self.__lockers[key]
del self.__lockers_counter[key]
def __get_thread_locker(self, key: str) -> threading.Lock:
if key in self.__lockers:
self.__lockers_counter[key] += 1
return self.__lockers[key]
self.__lockers_counter[key] = 1
thread_locker = threading.Lock()
self.__lockers[key] = thread_locker
return thread_locker
def lock_with_key(self, key: str) -> None:
"""Locks the thread associated with the key"""
thread_locker = self.__get_thread_locker(key)
thread_locker.acquire()
def unlock_with_key(self, key: str) -> None:
"""Unlocks the thread associated with the key"""
if key in self.__lockers:
thread_locker = self.__lockers[key]
self.__decrease_counter(key)
thread_locker.release()
The Mutex
class will be responsible for managing the relation between the locks and the keys; this relation is defined in the self.__lockers
attribute.
One might be asking where is the queue so we can handle the threads waiting for the lock to be released. The queue is implemented under the hood. As a thread reaches thread_lock.acquire()
the threading
module will implement a FIFO queue. Therefore, there is no need for us to develop a queue.
4. Workers 🦺
Since we have the mutex it is important to create the workers so they can lock threads based on keys. Each thread will have an instance of a worker, every worker will report to the mutex in order to lock and unlock threads.
Each worker is responsible to manage timeouts in case a thread takes too long to unlock the queue in the mutex.
4.1. Abstract Worker
Since we can have multiple workers it is important to define an interface so they can all work in the same way.
from abc import ABC, abstractmethod
from mutex import CustomMutex
class MutexWorkerAbstract(ABC):
@abstractmethod
def __enter__(self) -> "MutexWorkerAbstract":
"""implement context manager"""
@abstractmethod
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
"""implement context manager"""
@property
@abstractmethod
def custom_mutex(self) -> CustomMutex:
"""Enforcing composition with CustomMutex"""
As we dive into the worker's structure the context manager implementation will get clearer.
4.2. Worker Counter
import threading
from mutex import CustomMutex
from worker_abstract import MutexWorkerAbstract
class CounterMutexWorker(MutexWorkerAbstract):
def __init__(self):
self.__general_mutex = CustomMutex()
self.__counter_key_lock: int | None = None
self.__timeout = 5 # seconds
self.__timeout_timer = threading.Timer(
self.__timeout,
function=self.__timeout_callback
)
@property
def custom_mutex(self) -> CustomMutex:
return self.__general_mutex
def __enter__(self) -> "CounterMutexWorker":
return self
def __exit__(self, exc_type, exc_value, exc_tb) -> None:
self.unlock_by_counter()
def __format_key_lock_thread(self, counter_name: int) -> str:
key = f"COUNTER_WORKER_KEY_{counter_name}"
return key
def __timeout_callback(self) -> None:
print(f"Lock timeout! worker=CounterMutexWorker | number={self.__counter_key_lock}")
print(f"Releasing Lock... worker=CounterMutexWorker number={self.__counter_key_lock}")
self.unlock_by_counter()
def lock_by_counter(self, counter_name: str) -> None:
"""Locks thread by counter name"""
key = self.__format_key_lock_thread(counter_name)
self.custom_mutex.lock_with_key(key=key)
self.__timeout_timer.start()
self.__counter_key_lock = counter_name
def unlock_by_counter(self) -> None:
"""Unlocks thread by counter name"""
if self.__timeout_timer.is_alive():
self.__timeout_timer.cancel()
if self.__counter_key_lock:
key = self.__format_key_lock_thread(self.__counter_key_lock)
self.custom_mutex.unlock_with_key(key=key)
self.__counter_key_lock = None
PS.: Please note that two different workers cannot share the same key formatter
Now we can understand the importance of the context manager: if there is an exception before unlocking the thread, the queue is freed previous to the timeout expires.
Each worker has their own custom key. As soon as a thread locks the mutex, the worker initiates another thread to monitor the timeout, if the timeout exceeds the queue is released.
5. Testing
It's time to implement in the previous problem what we have built. Then let's check the results.
5.1. Implementing Worker
Applying the worker to the increase
function.
from worker import CounterMutexWorker
def increase(by: int, counter: Counter):
worker = CounterMutexWorker()
with worker:
worker.lock_by_counter(counter.name)
local_counter = counter.value
local_counter += by
sleep(0.1) # simulating process time
counter.value = local_counter
worker.unlock_by_counter()
5.1.1. Results After Mutex
Final value first_counter: 6
Final value second_counter: 2
Final value third_counter: 5
Final value fourth_counter: 3
Final value fifth_counter: 2
Final value sixth_counter: 3
Execution time: 0.608 seconds
5.2. Testing timeout
As timeout was implemented, let's check how it performs. We can test it by decreasing self.__timeout
from 5 to 0.02 seconds, since our sleep
time is 0.1 seconds.
class CounterMutexWorker(MutexWorkerAbstract):
def __init__(self):
self.__general_mutex = CustomMutex()
self.__counter_key_lock: int | None = None
self.__timeout = 0.02 # seconds
self.__timeout_timer = threading.Timer(
self.__timeout,
function=self.__timeout_callback
)
.
.
.
5.2.1. Timeout Results
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=fifth_counter
Releasing Lock... worker=CounterMutexWorker key=fifth_counter
Lock timeout! worker=CounterMutexWorker | key=second_counter
Releasing Lock... worker=CounterMutexWorker key=second_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=fourth_counter
Releasing Lock... worker=CounterMutexWorker key=fourth_counter
Lock timeout! worker=CounterMutexWorker | key=sixth_counter
Releasing Lock... worker=CounterMutexWorker key=sixth_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Lock timeout! worker=CounterMutexWorker | key=third_counter
Releasing Lock... worker=CounterMutexWorker key=third_counter
Lock timeout! worker=CounterMutexWorker | key=first_counter
Releasing Lock... worker=CounterMutexWorker key=first_counter
Final value first_counter: 2
Final value second_counter: 1
Final value third_counter: 1
Final value fourth_counter: 1
Final value fifth_counter: 1
Final value sixth_counter: 1
Execution time: 0.203 seconds
It took only 0.2 seconds because as timeouts were exceeding, the locker's queue was being released.
5.3. Load Test
What if we increase the number of threads and counters? For the purpose of testing the performance of the mutex, let's set 1000 threads
and 12 counters
.
5.3.1. Execution
import threading
from random import randint
from time import time
if __name__ == "__main__":
counters = [
Counter(value=0, name="first_counter"),
Counter(value=0, name="second_counter"),
Counter(value=0, name="third_counter"),
Counter(value=0, name="fourth_counter"),
Counter(value=0, name="fifth_counter"),
Counter(value=0, name="sixth_counter"),
Counter(value=0, name="seventh_counter"),
Counter(value=0, name="eighth_counter"),
Counter(value=0, name="ninth_counter"),
Counter(value=0, name="tenth_counter"),
Counter(value=0, name="eleventh_counter"),
Counter(value=0, name="twelfth_counter"),
]
amount_of_threads = 1000
threads = [
threading.Thread(
target=increase,
args=(
1,
counters[randint(0, len(counters) -1)]
)
)
for _ in range(amount_of_threads)
]
initial_time = time()
for t in threads:
t.start()
for t in threads:
t.join()
final_time = time()
print("\n=======================================\n")
for c in counters:
print(f"Final value {c.name}:", c.value)
print("\n=======================================\n")
print(f"Execution time: {round(final_time - initial_time, 3)} seconds")
5.3.2. Results Using Simple Lock
=======================================
Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72
=======================================
Execution time: 100.403 seconds
5.3.3. Results Using Mutex
=======================================
Final value first_counter: 74
Final value second_counter: 85
Final value third_counter: 85
Final value fourth_counter: 90
Final value fifth_counter: 92
Final value sixth_counter: 87
Final value seventh_counter: 85
Final value eighth_counter: 78
Final value ninth_counter: 85
Final value tenth_counter: 85
Final value eleventh_counter: 82
Final value twelfth_counter: 72
=======================================
Execution time: 9.815 seconds
6. Conclusion
After implementing the mutex we managed to reduced the time considerably.
21 threads
and6 counters
-> from2.108 seconds
to0.608 seconds
. Time reduction of71.1%
.1000 threads
and12 counters
-> from100.403 seconds
to9.815 seconds
. Time reduction of90.02%
.
As more threads and more counters were introduced these values got even better. So we can state that this is an excellent implementation for large systems.
Furthermore, the combination mutex + worker is scalable: If there is a demand for another lock in the same fashion, simply implement a new worker conforming to the class MutexWorkerAbstract
.
Top comments (0)