DEV Community

faangmaster
faangmaster

Posted on • Updated on

Реализовать потокобезопасную блокирующую очередь на Java ограниченного размера

Задача.

Нужно реализовать Thread Safe (потокобезопасную) блокирующую очередь на Java ограниченного размера. В Java уже есть стандартные блокирующие очереди, которые наследуют интерфейс BlockingQueue, такие как ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue.
Но в текущей задаче их нельзя использовать. Нужно хранить элементы очереди, скажем в LinkedList и реализовать два метода: put и take.
put - добавляет новый элемент в конец очереди, если размер очереди достиг максимального размера - то поток, который вызвал put ждет на этом методе (блокируется), пока размер очереди не станет меньше.
take - удаляет элемент из головы очереди и возвращает его в качестве результата. Если очередь пустая, то поток, который вызвал take ждет на этом методе (блокируется), пока не появится новый элемент в очереди.

Решение.

Будем использовать wait-notify механизм в Java для реализации такой очереди.
Смотри также подобную задачу: Ping Pong

Краткая справка по wait-notify:

Что нужно знать про wait — вначале нужно получить лок/монитор на объект, на котором он вызывается. Более того, рекомендуется вызов wait помещать в цикл while (condition):

//получаем монитор на объект лок
synchronized(lock) {
  while (condition) {
    lock.wait();
  }
}
Enter fullscreen mode Exit fullscreen mode

Вначале получаем лок/монитор на объект, потом вызываем wait. wait переводит поток в режим ожидания. При этом он отпускает монитор, чтобы другой поток смог получить монитор на тот же самый объект и вызвать notify(), который разбудит наш поток. Когда наш поток разбудили, он попытается снова получить лок на объект. Если ему это удается, он проверит условие в цикле. Если оно изменилось, то он не выполнит wait снова и выполнение потока продолжится. Если же condition не изменилось, то он снова уйдет в сон. Дополнительный цикл нужен, чтобы удостовериться, что поток разбудили, когда действительно произошло, то, чего ждал поток. Иногда происходят спорадические пробуджения или кто-то вызвал notifyAll() и наш поток еще не должен быть разбужен. Детально смотрите: wait.

Что нужно знать про notify. Его тоже нужно вызывать после получения лока/монитора на объект, на котором мы хотим вызвать notify. Когда он вызван, то поток, который ждет на этом объекте будет разбужен.

Вернемся к задаче.

Элементы очереди будем хранить в LinkeList (или можно в ArrayDeque).
В методе put будем помещать новый элемент в конец списка list.add(t).
В методе take будем брать из начала списка list.removeFirst().

Чтобы достичь блокирования потока, когда мы вызываем метод put и размер очереди превышен, добавим в начало метода блок с wait:

while (list.size() >= maxSize) {
    wait();
}
Enter fullscreen mode Exit fullscreen mode

и сделаем метод put synchronized.
Т.к. метод synchonized, мы вначале получим лок на наш объект очереди. Далее проверим условие, достигнут или превышен максимальный размер очереди. Если да, то вызовем метод wait. При этом поток, который вызвал wait блокируется и отпускает лок на объект очереди. Чтобы другой поток смог получить лок на очередь, удалить из нее элемент и разбудить наш поток при помощи notifyAll(). Как только наш поток был разбужен, мы снова попытаемся получить лок, снова в цикле проверим условие. Если размер очереди стал меньше, то мы снова выполнять метод wait не будем. А, просто, добавим новый элемент в конец очереди.
Также в конце метода put добавим вызов notifyAll(), чтобы разбудить потоки, которые ждали на методе take().

Аналогично для метода take. Только там условие для ожидания будет другим: пока очередь пустая:

while (list.isEmpty()) {
    wait();
}
Enter fullscreen mode Exit fullscreen mode

Т.е. если мы хотели получить элемент из очереди, а очередь пустая - переходим в режим ожидания, пока нас не разбудят.
В конце метода также добавим вызов notifyAll(), чтобы разбудить потоки, которые ждали на put и уменьшения размера очереди.

Все вместе

public class MyBlockingQueue<T> {
    private final LinkedList<T> list;
    private final int maxSize;

    public MyBlockingQueue(int maxSize) {
        this.maxSize = maxSize;
        this.list = new LinkedList<>();
    }

    public synchronized void put(T t) throws InterruptedException
    {
        while (list.size() >= maxSize) {
            wait();
        }
        list.add(t);
        notifyAll();
    }

    public synchronized T take() throws InterruptedException {
        while (list.isEmpty()) {
            wait();
        }
        T head = list.removeFirst();
        notifyAll();
        return head;
    }
}

Enter fullscreen mode Exit fullscreen mode

Top comments (0)