DEV Community

Cover image for Multithreading: Interview Questions and Practice Problems
Sushant Gaurav
Sushant Gaurav

Posted on

Multithreading: Interview Questions and Practice Problems

Before moving into the interview questions and practice set, check out these articles to understand the topic in detail:

Interview Questions

What will happen when exceptions occur in one thread? Will it impact the other threads?

If an exception occurs inside one thread in Python, it will terminate only that particular thread, not the entire program or other running threads. Each thread runs independently, so exceptions in one do not directly impact the execution of others. However, if the thread was holding a shared resource like a lock and it exits without releasing it, that can indirectly cause issues such as deadlocks or blocked threads. Also, the exception details are usually not propagated to the main thread unless explicitly handled, so it’s good practice to wrap thread logic in try-except blocks and use mechanisms like threading.excepthook (Python 3.8+) or custom error handling to capture and manage errors gracefully across threads.

Difference between daemon and non-daemon threads.

Daemon thread runs in the background, ends automatically when the main program exits. It is commonly used for background tasks (logging, monitoring).

Non-daemon thread keeps running until it finishes, even if the main thread ends.

Example:

t = Thread(target=worker, daemon=True)  # daemon thread
Enter fullscreen mode Exit fullscreen mode

What is a race-around condition, and how to solve race condition problems?

A race condition occurs when multiple threads read/write shared data simultaneously. Locks, RLocks, Semaphores, Conditions, and Queues can be used to solve race condition problems.

Explain context switching overhead.

Context switching means saving the state of one thread and loading another (Overhead is extra CPU cycles and memory flushes). With too many threads, performance drops due to excessive switching.

When should one not use threads (GIL issue)?

GIL (Global Interpreter Lock) allows only one thread to run Python bytecode at a time. Threads do not improve performance for CPU-heavy tasks (matrix multiplication, prime numbers). Use multiprocessing or NumPy/Cython that release the GIL.

Can we return something from the target function of the thread?

A thread’s target function does not directly return values. Some of the workarounds are:

  • Use a shared variable with proper locking.
  • Use concurrent.futures.ThreadPoolExecutor (future objects return results).

Difference between process, thread, and coroutine.

  • Process: Independent execution unit with separate memory.
  • Thread: Lightweight unit inside a process, shares memory.
  • Coroutine: Cooperatively scheduled, lightweight execution unit (async/await).

Ways to create threads.

  1. Using threading.Thread(target=func).
  2. Extending Thread class and overriding run().

What is the advantage of creating a custom thread class?

  • Allows initialisation via the constructor.
  • Encapsulation of task logic in run().
  • Easier to manage thread-specific attributes.

What is the difference between Thread identifier and Native Identifier?

  • ident: Python’s internal ID for a thread.
  • Native ID (native_id): OS-level thread ID (since Python 3.8).

What is the thread lifecycle?

  • New -> Runnable -> Running -> Waiting/Blocked -> Terminated
  • Managed by Python and the OS scheduler.
  • Use start(), join(), is_alive() to manage.

Explain synchronisation primitives.

  • Lock: Mutual exclusion (one thread at a time).
  • RLock: Same as Lock, but reentrant (thread can acquire it multiple times).
  • Semaphore: Allows a fixed number of threads to access a resource.
  • Condition: Allows threads to wait for a certain state (producer-consumer).
  • Event: Threads wait for an event to be set.
  • Barrier: All threads must reach a point before continuing.

What is Thread-local storage?

It allows thread-specific data without interfering with others.

Example:

import threading
local = threading.local()
local.value = 42
Enter fullscreen mode Exit fullscreen mode

What is the difference between Thread Pools and Manual Threading

  • Manual Threading: Full control, but harder to scale.
  • Thread Pools (ThreadPoolExecutor): Threads are reused, tasks queued automatically.

Some common coding tasks asked in interviews

  • Print thread details (name, ident, native_id).
  • Class methods + static methods in threads.
  • Producer-consumer problem.
  • Demonstrate race condition + fix with locks.
  • Build a simple multithreaded downloader.

Practice

Create threads and print their details

from threading import Thread, current_thread
from time import sleep

# Worker function
def work():
  sleep(2)  # Simulate some work (2 seconds pause)
  print(f"WORKER THREAD DETAILS: Name - {current_thread().name}, ID - {current_thread().ident}")

# Create 2 threads
t1 = Thread(target=work)
t2 = Thread(target=work)

# Start the threads
t1.start()
t2.start()

# wait for them to finish
t1.join()
t2.join()

# Print main thread details
print(f"MAIN THREAD DETAILS: Name - {current_thread().name}, ID - {current_thread().ident}")
Enter fullscreen mode Exit fullscreen mode

Output:

WORKER THREAD DETAILS: Name - Thread-1 (work), ID - 6150221824
WORKER THREAD DETAILS: Name - Thread-2 (work), ID - 6167048192
MAIN THREAD DETAILS: Name - MainThread, ID - 8456970432
Enter fullscreen mode Exit fullscreen mode

Define a constructor in the inherited thread class

from threading import Thread, current_thread
from time import sleep

class SubClass(Thread):
  def __init__(self):
    super().__init__()

  def work(self):
    sleep(2)  # Simulate some work (2 seconds pause)
    print(f"Inside thread - {current_thread().name}")

# Creating object of SubClass
obj1 = SubClass()

# Creating objects
t1 = Thread(target=obj1.work)
t2 = Thread(target=SubClass().work)

# Starting the threads
t1.start()
t2.start()

# Waiting for the threads to finish
t1.join()
t2.join()
Enter fullscreen mode Exit fullscreen mode

Output:

Inside thread - Thread-2 (work)
Inside thread - Thread-4 (work)
Enter fullscreen mode Exit fullscreen mode

Create and run threads for different types of methods inside a class — specifically a class method, a static method, and a regular instance method

from threading import Thread, current_thread
from time import sleep

class SubClass:
  def work_1(self):
    sleep(2)
    print(f"Inside INSTANCE method thread - {current_thread().name}")

  @classmethod
  def work_2(cls):
    sleep(2)
    print(f"Inside CLASS method thread - {current_thread().name}")

  @staticmethod
  def work_3():
    sleep(2)
    print(f"Inside STATIC method thread - {current_thread().name}")

# Create threads for each method type
t1 = Thread(target=SubClass().work_1)       # instance method
t2 = Thread(target=SubClass.work_2)         # class method
t3 = Thread(target=SubClass.work_3)         # static method

# Start threads
t1.start()
t2.start()
t3.start()

# Wait for all threads to finish
t1.join()
t2.join()
t3.join()
Enter fullscreen mode Exit fullscreen mode

Output (Order can be different):

Inside CLASS method thread - Thread-2 (work_2)
Inside INSTANCE method thread - Thread-1 (work_1)
Inside STATIC method thread - Thread-3 (work_3)
Enter fullscreen mode Exit fullscreen mode

Create a multithreaded downloader (download multiple files in parallel)

from threading import Thread, current_thread
from time import sleep

# Download function
def download():
  sleep(2)  # Simulate downloading work (2 seconds pause)
  print(f"File downloaded by {current_thread().name} Thread!")

# Creating threads
t1 = Thread(target=download, name="Downloader-1")
t2 = Thread(target=download, name="Downloader-2")
t3 = Thread(target=download, name="Downloader-3")

# Start the threads
t1.start()
t2.start()
t3.start()

# wait for them to finish
t1.join()
t2.join()
t3.join()
Enter fullscreen mode Exit fullscreen mode

Output (Order can be different):

File downloaded by Downloader-1 Thread!
File downloaded by Downloader-2 Thread!
File downloaded by Downloader-3 Thread!
Enter fullscreen mode Exit fullscreen mode

Demonstrate a race condition and fix it using a Lock.

from threading import Thread, Lock

lock = Lock()

# global variable
counter = 0

# Function without lock (causes race condition)
def worker_without_lock(value):
  global counter
  for _ in range(value):
    counter += 1
  print(f"Counter without lock: {counter}")

# Function with lock (fixes race condition)
def worker_with_lock(value):
  global counter
  with lock:
    for _ in range(value):
      counter += 1
  print(f"Counter with lock: {counter}")


print("--- Running WITHOUT Lock ---")
counter = 0
t1 = Thread(target=worker_without_lock, args=(1000000,))
t2 = Thread(target=worker_without_lock, args=(1000000,))
t3 = Thread(target=worker_without_lock, args=(1000000,))

t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

print(f"Final counter without lock: {counter}")

print("--- Running WITH Lock ---")
counter = 0  # reset counter
t1 = Thread(target=worker_with_lock, args=(100000,))
t2 = Thread(target=worker_with_lock, args=(100000,))
t3 = Thread(target=worker_with_lock, args=(100000,))

t1.start(); t2.start(); t3.start()
t1.join(); t2.join(); t3.join()

print(f"Final counter with lock: {counter}")
Enter fullscreen mode Exit fullscreen mode

Output:

--- Running WITHOUT Lock ---
Counter without lock: 1989806
Counter without lock: 2953264
Counter without lock: 3000000
Final counter without lock: 3000000
--- Running WITH Lock ---
Counter with lock: 100000
Counter with lock: 200000
Counter with lock: 300000
Final counter with lock: 300000
Enter fullscreen mode Exit fullscreen mode

Implement a producer-consumer queue with threading.Condition and queue.Queue.

1. Using queue.Queue:

from threading import Thread
import queue, time, random

# Shared queue
q = queue.Queue()

# Producer function
def producer():
  for i in range(10):
    item = random.randint(1, 100)
    q.put(item)
    print(f"Produced {item}")
    time.sleep(random.random())  # simulate work

# Consumer function
def consumer():
  while True:
    item = q.get()
    if item is None:  # Exit signal
      break
    print(f"Consumed {item}")
    time.sleep(random.random())  # simulate work
    q.task_done()

# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)

# Start threads
producer_thread.start()
consumer_thread.start()

# Wait for producer to finish
producer_thread.join()

# Send exit signal to consumer
q.put(None)

# Wait for consumer to finish
consumer_thread.join()
Enter fullscreen mode Exit fullscreen mode

Output:

Produced 57
Consumed 57
Produced 30
Consumed 30
Produced 17
Produced 1
Consumed 17
Consumed 1
Produced 46
Produced 28
Produced 16
Consumed 46
Produced 68
Consumed 28
Consumed 16
Produced 27
Consumed 68
Consumed 27
....
Enter fullscreen mode Exit fullscreen mode

2. Using threading.Condition:

Code-1:

from threading import Thread, Condition
import time, random

condition = Condition()
queue = []

def producer():
  for i in range(10):
    item = random.randint(1, 10)
    with condition:
      queue.append(item)
      print(f"Produced {item}")
      condition.notify()  # wake up one consumer
    time.sleep(random.randint(1, 10))

def consumer():
  while True:
    with condition:
      while not queue:
        condition.wait()  # wait until producer adds something
      item = queue.pop(0)
      print(f"Consumed {item}")
    time.sleep(random.randint(1, 10))

# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
# Stop consumer after producer finishes
time.sleep(2)
Enter fullscreen mode Exit fullscreen mode

Output:

Produced 3
Consumed 3
Produced 9
Consumed 9
Produced 8
Consumed 8
Produced 7
Consumed 7
Produced 6
Consumed 6
Produced 1
Consumed 1
Produced 10
Consumed 10
Produced 10
Consumed 10
Produced 4
Consumed 4
....
Enter fullscreen mode Exit fullscreen mode

Code-2:

from threading import Thread, Condition
import time, random

condition = Condition()
queue = []
DONE = object()  # special marker

def producer():
  for i in range(10):
    item = random.randint(1, 10)
    with condition:
      queue.append(item)
      print(f"Produced {item}")
      condition.notify()  # wake up one consumer
    time.sleep(random.uniform(0.2, 1))  # shorter sleep

  # Send stop signal
  with condition:
    queue.append(DONE)
    condition.notify()

def consumer():
  while True:
    with condition:
      while not queue:
        condition.wait()  # wait until producer adds something
      item = queue.pop(0)
      if item is DONE:
        print("Consumer exiting.")
        break
      print(f"Consumed {item}")
    time.sleep(random.uniform(0.2, 1))

# Create threads
producer_thread = Thread(target=producer)
consumer_thread = Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()
Enter fullscreen mode Exit fullscreen mode

Output:

Produced 6
Consumed 6
Produced 6
Consumed 6
Produced 4
Consumed 4
Produced 5
Consumed 5
Produced 3
Consumed 3
Produced 4
Consumed 4
Produced 7
Consumed 7
Produced 10
Consumed 10
Produced 9
Consumed 9
Produced 5
Consumed 5
Consumer exiting.
Enter fullscreen mode Exit fullscreen mode

Top comments (0)