Using ReentrantLock
and Condition
, we decouple monitor lock semantics from object-level intrinsic locks (synchronized
). This allows:
- Multiple condition queues (e.g., one for producers, one for consumers)
- Interruptible lock acquisition
- Timed waits
- Greater flexibility and improved performance under contention
π Refactored BlockingQueue
Implementation
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // for producers
private final Condition notEmpty = lock.newCondition(); // for consumers
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public void enqueue(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // wait until thereβs space
}
queue.add(item);
notEmpty.signal(); // wake up a waiting consumer
} finally {
lock.unlock();
}
}
public T dequeue() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // wait until thereβs an item
}
T item = queue.remove();
notFull.signal(); // wake up a waiting producer
return item;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
π Execution Demo
Same usage example as before:
public class ProducerConsumerWithLockDemo {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new BlockingQueue<>(5);
Runnable producer = () -> {
for (int i = 0; i < 10; i++) {
try {
queue.enqueue(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
Runnable consumer = () -> {
for (int i = 0; i < 10; i++) {
try {
int item = queue.dequeue();
System.out.println("Consumed: " + item);
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
new Thread(producer).start();
new Thread(consumer).start();
}
}
β Why This Is Better (Especially at Scale)
Feature | synchronized + wait/notifyAll() |
ReentrantLock + Condition |
---|---|---|
Fine-grained condition control | β One condition queue | β Multiple condition queues |
Selective notification | β notifyAll wakes all | β
signal() / signalAll() per group |
Interruptible locking | β No | β
lockInterruptibly()
|
Fairness policy | β Unfair | β Optional fairness on lock |
Scalability under contention | π« Moderate | β Better performance, especially with high contention |
π§ Real-World Usage Scenario
In high-concurrency systems such as:
- Thread pools (
ThreadPoolExecutor
) - Resource-bound connection pools (e.g., JDBC connection pool)
- Event-driven systems with backpressure
- Message brokers / queues
β¦using ReentrantLock
+ Condition
enables cleaner separation of concerns, optimized signaling, and lower contention overhead.
π¨ When to Use signal()
vs signalAll()
In this example:
-
notFull.signal()
β One producer will be unblocked when space becomes available. -
notEmpty.signal()
β One consumer will be unblocked when an item is produced.
Because only one producer or consumer can make progress at a time, using signal()
is both safe and performant.
If multiple consumers could act simultaneously (e.g., in a non-bounded queue), consider
signalAll()
β but assess performance trade-offs.
π Summary
-
ReentrantLock
+Condition
gives you industrial-grade concurrency. - Prefer this approach over
synchronized
in scenarios with high throughput, multiple wait conditions, or where lock fairness and responsiveness matter. - Use
signal()
for one-to-one signaling. Fall back tosignalAll()
only when you can't safely decide which thread should proceed.
Top comments (0)