DEV Community 👩‍💻👨‍💻

Charbel Sarkis
Charbel Sarkis

Posted on

Priority Queue in python with a custom object

I've come across this because you can use this priority queue in the selection process in the A* algorithm (i.e the process where you choose the node with the lowest f score), to get an O(1) instead of the traditional O(n) solution.

Since we are choosing the node with the lowest score and the priority queue keeps the lowest value at the beginning, we can then achieve this O(1) solution.

Here is the first file
node.py

class Node(object):
    def __init__(self, title, i, j):
        self.title = title
        self.i = str(i)
        self.j = str(j)
        self.f = 0
        self.g = 0
        self.h = 0
        # no neighbors at first
        self.neighbors = []

    def __str__(self):
        return '<' + self.title + '>'

    def __repr__(self):
        character = 'W' if self.title is '#' else 'O'
        return '<' + self.i + ' ' + self.j + ' ' + self.title + ', ' + str(len(self.neighbors))+'>'
Enter fullscreen mode Exit fullscreen mode

Don't mind the unnecessary code.

Here is the second and final file
priority_item.py

from dataclasses import dataclass, field
from typing import Any
from queue import PriorityQueue
from node import Node


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: object = field()

q = PriorityQueue()
n = Node('a', 0, 0)
n.f = 1
n2 = Node('b', 0, 0)
n2.f = 0
n3 = Node('c', 0, 0)
n3.f = 2
q.put(PrioritizedItem(n.f, n))
q.put(PrioritizedItem(n2.f, n2))
q.put(PrioritizedItem(n3.f, n3))
print(q.get())
Enter fullscreen mode Exit fullscreen mode

In this file I create a new PrioritizedItem which will encompass my custom object.

Below it I make 3 new node objects just as a test where the second has the lowest f score.

When inserting these custom nodes into the list, you wrap it in the PrioritizedItem and pass the value that you want to be used as the priority as the first argument, and then the priority queue will sort them from lowest to highest. This is because the PriorityQueue uses a min-heap data structure in its underlying implementation.

Top comments (1)

Collapse
 
parisa_khedri_bb6c3a573a0 profile image
Parisa Khedri

Hi Charbel, and thank you for sharing this post.

I'm currently trying to find a safe path for the units in Starcraft II. I have implemented an A* algorithm that does this. Because my solution was very slow, I did some research and came across your post.

I have now created the PrioritizedItem. I'm wondering if there is a way to check if an element is in the queue without looping. Right now I have:

if any(neighbor == item.item for item in open_queue.queue):
Enter fullscreen mode Exit fullscreen mode

It seems that it takes too much time to iterate through the open_queue, and because I have the if statement repeated for each neighbor, it takes even more time.

Thanks in advance, Parisa

About Real-time

Join DEV and MongoDB to build a front-end application using MongoDB Atlas. Change streams to display live updates as your database changes for your entry in the DEV x MongoDB Atlas Hackathon 2022.

Join the Hackathon