Indexed Minimum Priority Queue

Join me to stay up-to-date and get my new articles delivered to your inbox by subscribing here.

October 28, 2022

Data Structures & Algorithms 

Motivation

As stated here, a priority queue is an abstract data type similar to a regular queue or stack data structure in which each element additionally has a priority associated with it.

The most common functions of priority queues are push and pop operations as same as regular queues.

The time complexities of push and pop functions of regular queues are Θ(1). If implemented as binary heaps, the time complexities of push and pop operations of priority are Θ(log N).

Then why we are using priority queues if there are better options? 🙂 Sometimes we want to access the minimum element, pop it, push other items, and pop the minimum item again, …

If we try to solve this case (accessing the minimum element whenever needed) with regular queues, we should pop all items, and push all items (except minimum valued) back. This is very costly. (Θ(N))

Let’s be greedy. We need such a data structure, it supports the usual push and pop (delete-the-minimum) operations, along with delete and update methods. And it should support those operations with Θ(log N) time complexity. 🤔

Why do we need it? Review the effective algorithm/implementation of Dijkstra’s shortest path.

You can also review a python implementation of Dijkstra’s shortest path.

The original resource of indexed minimum priority queues is here.

https://algs4.cs.princeton.edu/lectures/keynote/43MinimumSpanningTrees.pdf

Python Implementation

This implementation uses a binary heap along with some helper data structures (pq, qp, and keys). The push, pop (delete-the-minimum), delete, and update operations take Θ(log n) time in the worst case, where n is the number of elements in the priority queue.

  • pq -> index of the key in heap position i
  • qp -> heap position of the key with index i
  • keys -> keeps priority values of keys
class INDEXED_MINIMUM_PRIORITY_QUEUE:
  def __init__(self):
    self.qp = {}
    self.pq = []
    self.keys = {}

  def __exch(self, i, j):
    self.pq[i], self.pq[j] = self.pq[j], self.pq[i]
    self.qp[self.pq[i]] = i
    self.qp[self.pq[j]] = j
  
  def __greater(self, i, j):
    return self.keys[self.pq[i]] > self.keys[self.pq[j]]
  
  def __left_child_index(self, k):
    return 2 * k + 1
  
  def __right_child_index(self, k):
    return 2 * k + 2
  
  def __parent_index(self, k):
    return (k - 1) // 2
  
  def __sink(self, k):
    while self.__left_child_index(k) < self.len():
      j = self.__left_child_index(k)
      if self.__right_child_index(k) < self.len() and self.__greater(j, j + 1): j = self.__right_child_index(k)
      if not self.__greater(k, j): break
      self.__exch(k, j)
      k = j

  def __swim(self, k):
    while (k and self.keys[self.pq[self.__parent_index(k)]] > self.keys[self.pq[k]]):
      self.__exch(k, self.__parent_index(k))
      k = self.__parent_index(k)

  def __str__(self):
    return str(self.keys)

  def len(self):
    return len(self.pq)

  def contains(self, key):
    return key in self.keys

  def push(self, key, val):
    if self.contains(key):
      self.update(key, val)
    else:
      self.qp[key] = self.len()
      self.keys[key] = val
      self.pq.append(key)
      self.__swim(self.len() - 1)

  def get_val(self, key):
    if not self.contains(key): raise KeyError
    return self.keys[key]

  def top(self):
    if self.is_empty(): raise IndexError
    return self.pq[0], self.get_val(self.pq[0])

  def pop(self):
    key, val = self.top()
    self.delete(key)
    return key, val

  def delete(self, key):
    if not self.contains(key): raise KeyError
    index = self.qp[key]
    self.__exch(index, self.len() - 1)
    self.__swim(index)
    self.pq.pop()
    self.__sink(index)
    self.keys.pop(key)
    self.qp.pop(key)

  def update(self, key, val):
    if not self.contains(key): raise KeyError
    self.keys[key] = val
    self.__swim(self.qp[key])
    self.__sink(self.qp[key])

  def is_empty(self):
    return self.len() == 0

Testing

import random
import string

pq = INDEXED_MINIMUM_PRIORITY_QUEUE()
test_dict = {}

def validate():
  assert(pq.len() == len(test_dict))
  for key,val in test_dict.items():
    try:
      assert(pq.contains(key))
      val_pq = pq.get_val(key)
      assert(val == val_pq)
    except:
      print(pq)
      print(test_dict)
      raise

for i in range(1000000):
  match random.randint(0, 10):
    case 0:
      if pq.is_empty(): continue
      match random.randint(0, 2):
        case 0:
          key_pq, val_pq = pq.pop()
          test_dict.pop(key_pq)
        case 1:
          key, val = random.choice(list(test_dict)), random.randint(1, 999)
          pq.update(key, val)
          test_dict[key] = val
        case 2:
          key = random.choice(list(test_dict))
          pq.delete(key)
          test_dict.pop(key)
    case _:
      key, val = random.choice(string.ascii_letters), random.randint(1, 999)
      pq.push(key, val)
      test_dict[key] = val
  validate()