Задача.
Реализовать потокобезопасную (Thread Safe) неблокирующую очередь на Java Т.е. нельзя использовать локи или synchonized, при этом она должна корректно работать в многопоточной среде.
Решение.
Задача аналогична Реализовать потокобезопасный неблокирующий стек на Java
Для решения будем использовать так называемые неблокирующие алгоритмы основанные на атомиках.
В основе неблокирующих алгоритмов лежит использование атомарной CAS (Compare-and-Swap или Compare-and-Set) операции. В Java для этого можно использовать атомики, которые поддерживают эти атомарные операции.
Основной частью неблокирующего алгоритма является использование конструкции вида:
do {
doing some changes...
} while (!atomicVariable.compareAndSet(our assumption))
Будем делать изменения в цикле и каждый раз проверять, что наши предположения все еще валидны и никто не изменил состояние объектов, к которым есть доступ из нескольких потоков.
Т.е. попробуем произвести изменения состояния объектов (не сохраняя результат в атомике) внутри тела цикла, а в условии цикла проверим, что никакой другой поток еще не изменил наш атомик. Если наш атомик не был изменен в другом потоке, то мы атомарно это проверим это и атомарно проапдейтим его значение.
Если другой поток уже атомарно изменил наш атомик, то мы не будем сохранять наш результат и попробуем проделать тоже самое на следующей итерации цикла. Это достигается благодаря атомарной операции compareAndSet на атомике. Смотрите задачу про стек в качестве более простого примера.
Но весь этот трюк с атомиками и неблокирующими алгоритмами работает хорошо в условии, когда у нас только один атомик. В случае со стеком - это была вершина стека. Оба метода push и pop меняли только вершину стека. В случае с очередью, нам нужно хранить ссылки на начало очереди и ее конец. Т.к. мы извлекаем из начала очереди, а добавляем в конец.
В таком случаем нам нужно уже иметь два атомика. Изменить два атомика одновременно, не использую локи, нельзя.
Более того, ситуация усложняется тем, что даже добавление в конец очереди не так просто реализовать. В случае со стеком, до атомарного обновления вершины стека, мы делали два действия:
- создавали новую вершину
- сетили next с этой новой вершины на старую вершину
И уже потом проверяли, что старую вершину никто не поменял в другом потоке, и обновляли атомарно ссылку на вершину.
Начальное состояние стека:
Первое действие - создание новой вершины:
Это действие никак не влият на стек.
Действие второе, сетим next на старую вершину:
Это действие также не влияет на стек.
И уже после этого в условии цикла пробуем атомарно изменить указатель на вершину стека. Это действие уже влияет на стек, но оно затрагивает только один атомик - вершину стека:
В случае с добавлением в конец очереди, нам также нужно совершить предварительно два действия:
- создать новую вершину
- обновить указатель next со старого хвоста очереди на новую вершину.
И уже потом обновить атомарно указатель на хвост очереди. Только тут проблема в том, что второе действие уже влияет на состояние очереди, т.к. мы обновляем указатель next с уже существующей вершины в очереди, а не с новой.
Начальное состояние очереди:
Первое действие - создание новой вершины, которая будет новым хвостом очереди. Это действие никак не влияет на очередь:
Второе действие - обновление next с текущего хвоста на новый. Уже эта операция влияет на состояние очереди:
И уже потом мы можем атомарно изменить ссылку на хвост очереди:
Т.к. в данном случае операция обновления next уже влияет на состоние очереди, нам нужно делать это атомарно. Поэтому ссылки next на следующую вершину также будем делать атомиками.
Т.е. даже просто операция добавления должна обновить два атомика: ссылку next и tail всей очереди.
Как же решить проблему того, что нам надо обновлять два атомика?
В таком случае нам нужно иметь некое промежуточное состояние очереди. Более того, нам нужно уметь расспознавать, что наша очередь в промежуточном состоянии из другого потока. И если мы пытаемся добавить элемент в конец очереди из двух потоков. Первый уже начал добавлять элемент, но еще не закончил, то второй поток должен уметь это расспознать. Если второй поток расспознал, что процесс добавления еще не завершен в первом потоке, то второй поток должен уметь завершать процесс добавления, инициированный первым, и попытаться снова вставить свой элемент в конец очереди на следующей итерации цикла.
В случае с добавлением в конец очереди, таким промежуточным состоянием будет состояние очереди, когда мы обновили ссылку next, но еще не обновили ссылку на хвост очереди:
Реализация
Давайте начнем с не Thread Safe реализации очереди на основе связаного списка.
Класс вершины списка:
class Node<E> {
E item;
Node<E> next;
public Node(E item, Node<E> next) {
this.item = item;
this.next = next;
}
}
Операция добавления в конец очереди:
private Node<E> tail;
public boolean put(E item) {
//Первое действие - создание вершины
Node<E> newNode = new Node<>(item, null);
//Второе действие - обновление next(промежуточное состояние)
if (tail != null) {
tail.next = newNode;
}
//Обновление хвоста очереди
tail = newNode;
return true;
}
Теперь давайте обновим эту реализацию до Thread Safe используя атомики.
Как мы уже выяснили ранее, второе действие (обновление next) уже влияет на очередь, поэтому нам нужно сделать next в классе Node атомиком, также сделаем его final:
class Node<E> {
final E item;
final AtomicReference<Node<E>> next;
public Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<>(next);
}
}
Вторым ключевым атомиком у нас будет ссылка на хвост очереди:
private final AtomicReference<Node<E>> tail = ...;
Сделаем его final, чтобы при доступе из разных потоков мы всегда работали с одним и тем же атомиком и никто не мог подменить ссылку на другой атомик. Из-за того, что он final нам его нужно инициализировать. Вначале инициализируем его пустой вершиной, которая никуда не указывает и ничего не хранит:
private final Node<E> emptyNode = new Node<>(null, null);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(emptyNode);
Теперь давайте напишем реализацию, используя эти атомики, без дополнительных проверок в каком мы сейчас состоянии:
private final Node<E> emptyNode = new Node<>(null, null);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(emptyNode);
public boolean put(E item) {
//Первое действие - создание вершины
Node<E> newNode = new Node<>(item, null);
//Второе действие - обновление next(промежуточное состояние)
//compareAndSet выполнится только при условии,
//что next == null, т.е. его еще никто из
//другого потока не проапдейтил
Node<E> currentTail = tail.get();
currentTail.next.compareAndSet(null, newNode);
//Обновление хвоста очереди
//Выполнится только, если tail еще не поменяли в другом потоке
tail.compareAndSet(currentTail, newNode);
return true;
}
В данной реализации у нас нет никаких дополнительных проверок на то, в каком состоянии очередь в данных момент, а также у нас нет цикла, чтобы попробовать операцию еще раз, если наши предположения о состоянии очереди не валидны.
Добавим цикл для повторной попытки и проверки, что в процессе операции у нас не изменился next (что мы не в промежуточном состоянии):
private final Node<E> emptyNode = new Node<>(null, null);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(emptyNode);
public boolean put(E item) {
Node<E> newNode = new Node<>(item, null);
//цикл, условия выхода из цикла будут в теле цикла
while(true) {
Node<E> currentTail = tail.get();
//Получаем ссылку на next
Node<E> tailNext = currentTail.next.get();
//проверяем, что хвост не успел поменяться
//из другого потока, иначе переходим к следующей
//итерации цикла (следующей попытки)
if (currentTail == tail.get()) {
//Проверяем, что ссылка next с хвоста еще не
//поменялась из другого потока
//(мы не в промежуточном состоянии)
if (tailNext == null) {
//Обновляем ссылку next,
//если она поменялась - переходим на следующую
//итерацию цикла (следующую попытку)
if (currentTail.next.compareAndSet(null, newNode)) {
//Обновляем хвост
tail.compareAndSet(currentTail, newNode);
return true;
}
}
}
}
}
Осталось добавить детектирование промежуточного состояния. И в случае если мы обнаружили, что очередь в промежуточном состоянии - мы может попробовать завершить начатую операцию в другом потоке и попробовать еще раз нашу операцию.
private final Node<E> emptyNode = new Node<>(null, null);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(emptyNode);
public boolean put(E item) {
Node<E> newNode = new Node<>(item, null);
while(true) {
Node<E> currentTail = tail.get();
Node<E> tailNext = currentTail.next.get();
if (currentTail == tail.get()) {
if (tailNext != null) {
//Очередь в промежуточном состоянии
//Хвост еще не обновился,
//а next на хвосте уже обновилась
//Пробуем помочь другому потоку завершить операцию,
//путем обновления ссылки на хвост
//Если другой поток был быстрее,
//то это обновление не сработает
//и мы перейдем на следующую итерацию цикла
//для новой попытки
//Если сработает, то мы поможем первому потоку и
//сами также перейдем к следующей попытки вставить
//наш элемент в очередь
tail.compareAndSet(currentTail, tailNext);
} else {
if (currentTail.next.compareAndSet(null, newNode)) {
tail.compareAndSet(currentTail, newNode);
return true;
}
}
}
}
}
Все вместе:
public class NonBlockingThreadSafeQueue<E> {
private static class Node<E> {
final E item;
final AtomicReference<Node<E>> next;
public Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<>(next);
}
}
private final Node<E> emptyNode = new Node<>(null, null);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(emptyNode);
public boolean put(E item) {
Node<E> newNode = new Node<>(item, null);
while(true) {
Node<E> currentTail = tail.get();
Node<E> tailNext = currentTail.next.get();
if (currentTail == tail.get()) {
if (tailNext != null) {
//Очередь в промежуточном состоянии
//Хвост еще не обновился,
//а next на хвосте уже обновилась
//Пробуем помочь другому потоку завершить операцию,
//путем обновления ссылки на хвост
//Если другой поток был быстрее,
//то это обновление не сработает
//и мы перейдем на следующую итерацию цикла
//для новой попытки
//Если сработает, то мы поможем первому потоку и
//сами также перейдем к следующей попытки вставить
//наш элемент в очередь
tail.compareAndSet(currentTail, tailNext);
} else {
if (currentTail.next.compareAndSet(null, newNode)) {
tail.compareAndSet(currentTail, newNode);
return true;
}
}
}
}
}
}
Описанный выше алгоритм называется алгоритм Michael-Scott nonblocking linked-queue algorithm (Michael and Scott, 1996). Он используется в реализации ConcurrentLinkedQueue в стандартной библиотеке Java.
Top comments (0)