Charbel Sarkis

Posted on

# Priority Queue in python with a custom object

I've come across this because you can use this priority queue in the selection process in the A* algorithm (i.e the process where you choose the node with the lowest f score), to get an O(1) instead of the traditional O(n) solution.

Since we are choosing the node with the lowest score and the priority queue keeps the lowest value at the beginning, we can then achieve this O(1) solution.

Here is the first file
node.py

class Node(object):
def __init__(self, title, i, j):
self.title = title
self.i = str(i)
self.j = str(j)
self.f = 0
self.g = 0
self.h = 0
# no neighbors at first
self.neighbors = []

def __str__(self):
return '<' + self.title + '>'

def __repr__(self):
character = 'W' if self.title is '#' else 'O'
return '<' + self.i + ' ' + self.j + ' ' + self.title + ', ' + str(len(self.neighbors))+'>'

Don't mind the unnecessary code.

Here is the second and final file
priority_item.py

from dataclasses import dataclass, field
from typing import Any
from queue import PriorityQueue
from node import Node

@dataclass(order=True)
class PrioritizedItem:
priority: int
item: object = field()

q = PriorityQueue()
n = Node('a', 0, 0)
n.f = 1
n2 = Node('b', 0, 0)
n2.f = 0
n3 = Node('c', 0, 0)
n3.f = 2
q.put(PrioritizedItem(n.f, n))
q.put(PrioritizedItem(n2.f, n2))
q.put(PrioritizedItem(n3.f, n3))
print(q.get())

In this file I create a new PrioritizedItem which will encompass my custom object.

Below it I make 3 new node objects just as a test where the second has the lowest f score.

When inserting these custom nodes into the list, you wrap it in the PrioritizedItem and pass the value that you want to be used as the priority as the first argument, and then the priority queue will sort them from lowest to highest. This is because the PriorityQueue uses a min-heap data structure in its underlying implementation.

Parisa Khedri

Hi Charbel, and thank you for sharing this post.

I'm currently trying to find a safe path for the units in Starcraft II. I have implemented an A* algorithm that does this. Because my solution was very slow, I did some research and came across your post.

I have now created the PrioritizedItem. I'm wondering if there is a way to check if an element is in the queue without looping. Right now I have:

if any(neighbor == item.item for item in open_queue.queue):

It seems that it takes too much time to iterate through the open_queue, and because I have the if statement repeated for each neighbor, it takes even more time.