Before moving into the interview questions and practice set, check out these articles to understand the topic in detail:
- Python Concurrency: Processes, Threads, and Coroutines Explained
- Multithreading in Python: Lifecycle, Locks, and Thread Pools
Interview Questions
What will happen when exceptions occur in one thread? Will it impact the other threads?
If an exception occurs inside one thread in Python, it will terminate only that particular thread, not the entire program or other running threads. Each thread runs independently, so exceptions in one do not directly impact the execution of others. However, if the thread was holding a shared resource like a lock and it exits without releasing it, that can indirectly cause issues such as deadlocks or blocked threads. Also, the exception details are usually not propagated to the main thread unless explicitly handled, so it’s good practice to wrap thread logic in try-except blocks and use mechanisms like threading.excepthook
(Python 3.8+) or custom error handling to capture and manage errors gracefully across threads.
Difference between daemon and non-daemon threads.
Daemon thread runs in the background, ends automatically when the main program exits. It is commonly used for background tasks (logging, monitoring).
Non-daemon thread keeps running until it finishes, even if the main thread ends.
Example:
t = Thread(target=worker, daemon=True) # daemon thread
What is a race-around condition, and how to solve race condition problems?
A race condition occurs when multiple threads read/write shared data simultaneously. Locks, RLocks, Semaphores, Conditions, and Queues can be used to solve race condition problems.
Explain context switching overhead.
Context switching means saving the state of one thread and loading another (Overhead is extra CPU cycles and memory flushes). With too many threads, performance drops due to excessive switching.
When should one not use threads (GIL issue)?
GIL (Global Interpreter Lock) allows only one thread to run Python bytecode at a time. Threads do not improve performance for CPU-heavy tasks (matrix multiplication, prime numbers). Use multiprocessing or NumPy/Cython that release the GIL.
Can we return something from the target function of the thread?
A thread’s target function does not directly return values. Some of the workarounds are:
- Use a shared variable with proper locking.
- Use
concurrent.futures.ThreadPoolExecutor
(future objects return results).
Difference between process, thread, and coroutine.
- Process: Independent execution unit with separate memory.
- Thread: Lightweight unit inside a process, shares memory.
- Coroutine: Cooperatively scheduled, lightweight execution unit (async/await).
Ways to create threads.
- Using
threading.Thread(target=func)
. - Extending
Thread
class and overridingrun()
.
What is the advantage of creating a custom thread class?
- Allows initialisation via the constructor.
- Encapsulation of task logic in
run()
. - Easier to manage thread-specific attributes.
What is the difference between Thread identifier and Native Identifier?
-
ident
: Python’s internal ID for a thread. -
Native ID (
native_id
): OS-level thread ID (since Python 3.8).
What is the thread lifecycle?
- New -> Runnable -> Running -> Waiting/Blocked -> Terminated
- Managed by Python and the OS scheduler.
- Use
start()
,join()
,is_alive()
to manage.
Explain synchronisation primitives.
- Lock: Mutual exclusion (one thread at a time).
- RLock: Same as Lock, but reentrant (thread can acquire it multiple times).
- Semaphore: Allows a fixed number of threads to access a resource.
- Condition: Allows threads to wait for a certain state (producer-consumer).
- Event: Threads wait for an event to be set.
- Barrier: All threads must reach a point before continuing.
What is Thread-local storage?
It allows thread-specific data without interfering with others.
Example:
import threading
local = threading.local()
local.value = 42
What is the difference between Thread Pools and Manual Threading
- Manual Threading: Full control, but harder to scale.
-
Thread Pools (
ThreadPoolExecutor
): Threads are reused, tasks queued automatically.
Some common coding tasks asked in interviews
- Print thread details (
name
,ident
,native_id
). - Class methods + static methods in threads.
- Producer-consumer problem.
- Demonstrate race condition + fix with locks.
- Build a simple multithreaded downloader.
Practice
Create threads and print their details
from threading import Thread, current_thread
from time import sleep
# Worker function
def work():
sleep(2) # Simulate some work (2 seconds pause)
print(f"WORKER THREAD DETAILS: Name - {current_thread().name}, ID - {current_thread().ident}")
# Create 2 threads
t1 = Thread(target=work)
t2 = Thread(target=work)
# Start the threads
t1.start()
t2.start()
# wait for them to finish
t1.join()
t2.join()
# Print main thread details
print(f"MAIN THREAD DETAILS: Name - {current_thread().name}, ID - {current_thread().ident}")
Output:
WORKER THREAD DETAILS: Name - Thread-1 (work), ID - 6150221824
WORKER THREAD DETAILS: Name - Thread-2 (work), ID - 6167048192
MAIN THREAD DETAILS: Name - MainThread, ID - 8456970432
Define a constructor in the inherited thread class
from threading import Thread, current_thread
from time import sleep
class SubClass(Thread):
def __init__(self):
super().__init__()
def work(self):
sleep(2) # Simulate some work (2 seconds pause)
print(f"Inside thread - {current_thread().name}")
# Creating object of SubClass
obj1 = SubClass()
# Creating objects
t1 = Thread(target=obj1.work)
t2 = Thread(target=SubClass().work)
# Starting the threads
t1.start()
t2.start()
# Waiting for the threads to finish
t1.join()
t2.join()
Output:
Inside thread - Thread-2 (work)
Inside thread - Thread-4 (work)
Create and run threads for different types of methods inside a class — specifically a class method, a static method, and a regular instance method
from threading import Thread, current_thread
from time import sleep
class SubClass:
def work_1(self):
sleep(2)
print(f"Inside INSTANCE method thread - {current_thread().name}")
@classmethod
def work_2(cls):
sleep(2)
print(f"Inside CLASS method thread - {current_thread().name}")
@staticmethod
def work_3():
sleep(2)
print(f"Inside STATIC method thread - {current_thread().name}")
# Create threads for each method type
t1 = Thread(target=SubClass().work_1) # instance method
t2 = Thread(target=SubClass.work_2) # class method
t3 = Thread(target=SubClass.work_3) # static method
# Start threads
t1.start()
t2.start()
t3.start()
# Wait for all threads to finish
t1.join()
t2.join()
t3.join()
Output (Order can be different):
Inside CLASS method thread - Thread-2 (work_2)
Inside INSTANCE method thread - Thread-1 (work_1)
Inside STATIC method thread - Thread-3 (work_3)
Create a multithreaded downloader (download multiple files in parallel)
from threading import Thread, current_thread
from time import sleep
# Download function
def download():
sleep(2) # Simulate downloading work (2 seconds pause)
print(f"File downloaded by {current_thread().name} Thread!")
# Creating threads
t1 = Thread(target=download, name="Downloader-1")
t2 = Thread(target=download, name="Downloader-2")
t3 = Thread(target=download, name="Downloader-3")
# Start the threads
t1.start()
t2.start()
t3.start()
# wait for them to finish
t1.join()
t2.join()
t3.join()
Output (Order can be different):
File downloaded by Downloader-1 Thread!
File downloaded by Downloader-2 Thread!
File downloaded by Downloader-3 Thread!
Demonstrate a race condition and fix it using a Lock
.
from threading import Thread, Lock
lock = Lock()
# global variable
counter = 0
# Function without lock (causes race condition)
def worker_without_lock(value):
global counter
for _ in range(value):
counter += 1
print(f"Counter without lock: {counter}")
# Function with lock (fixes race condition)
def worker_with_lock(value):
global counter
with lock:
for _ in range(value):
counter += 1
print(f"Counter with lock: {counter}")
print("--- Running WITHOUT Lock ---")
counter = 0
t1 = Thread(target=worker_without_lock, args=(1000000,))
t2 = Thread(target=worker_without_lock, args=(1000000,))
t3 = Thread(target=worker_without_lock, args=(1000000,))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
print(f"Final counter without lock: {counter}")
print("--- Running WITH Lock ---")
counter = 0 # reset counter
t1 = Thread(target=worker_with_lock, args=(100000,))
t2 = Thread(target=worker_with_lock, args=(100000,))
t3 = Thread(target=worker_with_lock, args=(100000,))
t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()
print(f"Final counter with lock: {counter}")
Output:
--- Running WITHOUT Lock ---
Counter without lock: 1989806
Counter without lock: 2953264
Counter without lock: 3000000
Final counter without lock: 3000000
--- Running WITH Lock ---
Counter with lock: 100000
Counter with lock: 200000
Counter with lock: 300000
Final counter with lock: 300000
Implement a producer-consumer queue with threading.Condition
and queue.Queue
.
1. Using queue.Queue
:
from threading import Thread
import queue, time, random
# Shared queue
q = queue.Queue()
# Producer function
def producer():
for i in range(10):
item = random.randint(1, 100)
q.put(item)
print(f"Produced {item}")
time.sleep(random.random()) # simulate work
# Consumer function
def consumer():
while True:
item = q.get()
if item is None: # Exit signal
break
print(f"Consumed {item}")
time.sleep(random.random()) # simulate work
q.task_done()
# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)
# Start threads
producer_thread.start()
consumer_thread.start()
# Wait for producer to finish
producer_thread.join()
# Send exit signal to consumer
q.put(None)
# Wait for consumer to finish
consumer_thread.join()
Output:
Produced 57
Consumed 57
Produced 30
Consumed 30
Produced 17
Produced 1
Consumed 17
Consumed 1
Produced 46
Produced 28
Produced 16
Consumed 46
Produced 68
Consumed 28
Consumed 16
Produced 27
Consumed 68
Consumed 27
....
2. Using threading.Condition
:
Code-1:
from threading import Thread, Condition
import time, random
condition = Condition()
queue = []
def producer():
for i in range(10):
item = random.randint(1, 10)
with condition:
queue.append(item)
print(f"Produced {item}")
condition.notify() # wake up one consumer
time.sleep(random.randint(1, 10))
def consumer():
while True:
with condition:
while not queue:
condition.wait() # wait until producer adds something
item = queue.pop(0)
print(f"Consumed {item}")
time.sleep(random.randint(1, 10))
# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
# Stop consumer after producer finishes
time.sleep(2)
Output:
Produced 3
Consumed 3
Produced 9
Consumed 9
Produced 8
Consumed 8
Produced 7
Consumed 7
Produced 6
Consumed 6
Produced 1
Consumed 1
Produced 10
Consumed 10
Produced 10
Consumed 10
Produced 4
Consumed 4
....
Code-2:
from threading import Thread, Condition
import time, random
condition = Condition()
queue = []
DONE = object() # special marker
def producer():
for i in range(10):
item = random.randint(1, 10)
with condition:
queue.append(item)
print(f"Produced {item}")
condition.notify() # wake up one consumer
time.sleep(random.uniform(0.2, 1)) # shorter sleep
# Send stop signal
with condition:
queue.append(DONE)
condition.notify()
def consumer():
while True:
with condition:
while not queue:
condition.wait() # wait until producer adds something
item = queue.pop(0)
if item is DONE:
print("Consumer exiting.")
break
print(f"Consumed {item}")
time.sleep(random.uniform(0.2, 1))
# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
Output:
Produced 6
Consumed 6
Produced 6
Consumed 6
Produced 4
Consumed 4
Produced 5
Consumed 5
Produced 3
Consumed 3
Produced 4
Consumed 4
Produced 7
Consumed 7
Produced 10
Consumed 10
Produced 9
Consumed 9
Produced 5
Consumed 5
Consumer exiting.
Top comments (0)