loading...

Writing a Minimum-Heap in Python3

therealdarkmage profile image darkmage ・6 min read

Before I get started, if you need some background on what a heap is:

https://en.wikipedia.org/wiki/Heap_(data_structure)

This discussion is about a minimum heap.


I recently spent the last week helping one of my paying students to write a minimum-heap data structure in Python3.

Given the following class definition for a Node:

class Node:
    """
    Class definition shouldn't be modified in anyway
    """
    __slots__ = ('_key', '_val')

    def __init__(self, key, val):
        self._key = key
        self._val = val

    def __lt__(self, other):
        return self._key < other._key or (self._key == other._key and self._val < other._val)

    def __gt__(self, other):
        return self._key > other._key or (self._key == other._key and self._val > other._val)

    def __eq__(self, other):
        return self._key == other._key and self._val == other._val

    def __str__(self):
        return '(k: {0} v: {1})'.format(self._key, self._val)

    __repr__ = __str__

    @property
    def val(self):
        """
        :return: underlying value of node
        """
        return self._val

Be able to define the missing methods on a Minimum-Heap data structure:

class Heap:
    """
    Class definition is partially completed.
    Method signatures and provided methods may not be edited in any way.
    """
    __slots__ = ('_size', '_capacity', '_data')

    def __init__(self, capacity):
        self._size = 0
        self._capacity = capacity + 1  # additional element space for push
        self._data = [None for _ in range(self._capacity)]

    def __str__(self):
        return ', '.join(str(el) for el in self._data if el is not None)

    __repr__ = __str__

    def __len__(self):  # allows for use of len(my_heap_object)
        return self._size

#    DO NOT MODIFY ANYTHING ABOVE THIS LINE
#    Start of Student Modifications

    def _percolate_up(self):
        pass

    def _percolate_down(self):
        pass

    def _min_child(self, i):
        pass

    def push(self, key, val):
        pass

    def pop(self):
        pass

    @property  # do not remove
    def empty(self):
        pass

    @property  # do not remove
    def top(self):
        pass

    @property  # do not remove
    def levels(self):
        pass

Further, there is a function we wish to write outside of the Heap class itself, def most_common_x(vals, X):, which will take a list of values, vals, and then return a set of the X most common values in the list.

def most_common_x(vals, X):
    pass

I'm going to walk through each method, one by one, just as I did with my student.

It is important to test your code as you go along, and to not just rely on automatic testing solutions. Know how to manually test and verify that the output of a method is what you want before assuming that something works.

Let's begin!


push()

push() depends on _percolate_up(), otherwise we could just write:

def push(self, key, val):
        self._data[self._size] = Node(key,val)
        self._size += 1

and be done.

This much would allow us to create a really simple heap:

h = Heap(4)
h.push(4, 'a')
h.push(2, 'b')
h.push(3, 'c')
h.push(1, 'd')

At this point, we should start looking at _percolate_up().


_percolate_up()

_percolate_up() is all about swapping the bottom node with its parent nodes until its parents are no longer greater than it.

    4
   / \
  2   3
 /
1

Take the heap above. If we were to call _percolate_up() on it, it should result in a heap that looks more like:

    1
   / \
  2   3
 /
4
  • So, we'd need to start at the end of the heap.
  • Lets create an index and set it equal to the last valid index in our data list.
  • Now, lets loop through our list until we've reached the root node:
    • Grab the node for our current index. This is child.
    • Calculate the parentIndex, which is the index i minus 1 divided by 2 as a whole int.
    • Grab the parent node given its index.
    • If both child and parent are not None:
      • If the child is less than the parent:
        • Swap them
    • Update the index to point to parent
def _percolate_up(self):
    i = self._size - 1
    while i/2 > 0:
        child = self._data[i]
        parentIndex = (i-1)//2
        parent = self._data[parentIndex]
        if child is not None and parent is not None:
            if child < parent:
                tmp = parent
                self._data[parentIndex] = child
                self._data[i] = tmp 
        i = parentIndex

_percolate_down()

Turns out that push() also depends on pop(), which depends on _percolate_down().

Turns out that _percolate_down() depends on _min_child()!


_min_child()

This one is just a matter of patiently handling each case.

  1. If empty
  2. If the "child" index is out-of-bounds
  3. If the given index has no Node
  4. If node has no children
  5. If node has only left child
  6. If node has only right child
  7. If node has both children

This actually depends on empty() but we'll do that in a minute.

def _min_child(self, i):
    if self.empty:
        return -1
    if i*2+1 > self._size:
        return -1
    if self._data[i] is None:
        return -1
    left = i*2+1
    right = i*2+2
    left_val = self._data[left]
    right_val = self._data[right]
    if left_val is None and right_val is None:
        return -1
    if left_val is not None and right_val is None:
        return left
    if left_val is None and right_val is not None:
        return right
    if self._data[left] < self._data[right]:
        return left
    return right

back to _percolate_down()

So, this time, stepping through branches of the tree in the opposite way to _percolate_up():

  • While we're not at the end of the heap:
    • Grab the minimum child
    • If valid and the current node is greater than the minimum child:
      • Swap them
    • Update the index to point to the minimum child's index
def _percolate_down(self):
    i = 0
    while i*2 <= self._size-1:
        mc = self._min_child(i)
        if mc == -1:
            break
        if self._data[i] > self._data[mc]:
            tmp = self._data[i]
            self._data[i] = self._data[mc]
            self._data[mc] = tmp
        i = mc

empty()

Super easy let's knock it out:

def empty(self):
    if self._size == 0:
        return True
    return False

pop()

Now that we've written _percolate_down() and empty(), we can take care of this one.

  • If the heap is empty, return None
  • Our return-value is just the head node, or the front of our data list
  • Swap the front node with the end of our heap / the last value / node
  • Decrement size
  • Clear the last element (effectively we swap the smallest and biggest elements, nullify the last node, then...)
  • Percolate the head node down to its new position
def pop(self):
    if self.empty:
        return None
    rv = self._data[0].val
    self._data[0] = self._data[self._size-1]
    self._size -= 1
    self._data[self._size] = None
    self._percolate_down()
    return rv

return to push()

With both pop() and _percolate_up() taken care of, it is time we took care of push():

  • Create a new Node in the last position in our heap
  • Increment heap-size
  • Percolate the new node up into its appropriate position
  • If we've reached capacity, pop the smallest node out
def push(self, key, val):
    self._data[self._size] = Node(key,val)
    self._size += 1
    self._percolate_up()
    if self._size == self._capacity:
        self.pop()

top()

Also super-easy. Just grab the top node so long as heap is not empty.

def top(self):
    if self.empty is False:
        return self._data[0].val
    return None

levels()

This one is a bit longer.

Levels is to return a list-of-lists, with each heap-level's nodes in each list (so, one list per level).

Edge-case 1: Empty heap. Return an empty list.

  • Start at the beginning of the heap
  • Create a new list for this level
  • If it is the first list we are adding to:
    • Add the root node to the levelOne list
    • Add the levelOne list to our return list
  • Else:
    • Calculate the indices of the startIndex and endIndex of this level in our heap's data list
    • Add each node to our levelList
    • If the levelList is not empty, add it to our return list
def levels(self):
    ourList = []
    if self.empty:
        return ourList
    prevStartIndex = 0
    while prevStartIndex < len(self._data):
        thisLevel = []
        if len(ourList) == 0:
            thisLevel.append(self._data[0])
            prevStartIndex = 0
            ourList.append(thisLevel)
        else:
            startIndex = (prevStartIndex * 2) + 1
            endIndex = startIndex * 2
            j = startIndex 
            while j < len(self._data) and j <= endIndex:
                node = self._data[j]
                if node is not None:
                    thisLevel.append(self._data[j])
                j += 1
            prevStartIndex = startIndex 
            if len(thisLevel) != 0:
                ourList.append(thisLevel)
    return ourList 

Lastly, the function outside of the Heap class:

most_common_x(vals, X)

  • Create an empty dictionary
  • For each element in vals:
    • Count it in the dictionary by incrementing its value for an element key
  • Create a new Heap
  • Given the dictionary keys:
    • Grab the value off the dictionary for each key, this is the number of times that element appeared in the list vals
    • Push it onto the heap
  • Pop off enough nodes such that remaining Nodes are X in count
  • Add remaining nodes to a return set

Code with some comments:

def most_common_x(vals, X):

    # step 1 - build the dictionary
    d = {}

    for element in vals:
        # check to see if there is a count for our given element
        # if not, return 0 for default
        count = d.get(element, 0)

        # increment count
        count += 1

        # re-assign new count back to dictionary
        d[element] = count

    # step 2 - build the heap from the dictionary
    d_keys = d.keys()
    heapSize = len(d_keys)
    heap = Heap(heapSize)

    for key in d_keys:
        count = d[key]
        heap.push(count, key)

    # step 3 - grab the leaf nodes and add to our return set
    returnSet = set()
    while len(heap) > X:
        heap.pop()

    while not heap.empty:
        returnSet.add(heap.pop())

    return returnSet

This is, admittedly, a quick write-up, but I can always flesh it out further if you need clarification, just ask! :D


If you need a Computer Science tutor, code reviewer, or just someone to pair program with, hit me up

Posted on by:

therealdarkmage profile

darkmage

@therealdarkmage

Computer Science Tutor, Programmer, Gamedev, Hacker, Bug Bounty Hunter

Discussion

markdown guide
 

You might want to look at my code. It is in C#, but I have a generic heap, and all you need to make a heap is a compare function. You don't need two values when you add to a heap. You can have multiple heaps of the same type but they can use different priority algorithms based on the delegate you pass in on construction.
github.com/ZacharyPatten/Towel

 

This was a student of mine's homework project. His professor's specifications required the Heap to store Node objects with key-values on them. I understand that you can do this many different ways, but thank you for the clarification. I like the idea of initializing with a different delegate to indicate algorithm changes :)

 

If you have to store keys and values, all you do is (in C# using my example) this:

Heap<(KeyType, ValueType)>

So there should only ever be one generic type, even if you need to store multiple values, you just pass in a type with multiple values.

Just wanted to clarify that. Let me know if that doesn't make sense and I will gladly elaborate.

But yeah. Hope the student had fun figuring out the algorithms of a heap. :)

Yeah, your template looks a lot like the templating stuff I'd use in C++ and Java (I don't know C# but I understand it is similar enough).

We both had fun working through the methods :D It is definitely good practice.