Join me to stay up-to-date and get my new articles delivered to your inbox by subscribing here.
As stated here, a priority queue is an abstract data type similar to a regular queue or stack data structure in which each element additionally has a priority associated with it.
The most common functions of priority queues are push and pop operations as same as regular queues.
The time complexities of push and pop functions of regular queues are Θ(1). If implemented as binary heaps, the time complexities of push and pop operations of priority are Θ(log N).
Then why we are using priority queues if there are better options? 🙂 Sometimes we want to access the minimum element, pop it, push other items, and pop the minimum item again, …
If we try to solve this case (accessing the minimum element whenever needed) with regular queues, we should pop all items, and push all items (except minimum valued) back. This is very costly. (Θ(N))
Let’s be greedy. We need such a data structure, it supports the usual push and pop (delete-the-minimum) operations, along with delete and update methods. And it should support those operations with Θ(log N) time complexity. 🤔
Why do we need it? Review the effective algorithm/implementation of Dijkstra’s shortest path.
You can also review a python implementation of Dijkstra’s shortest path.
The original resource of indexed minimum priority queues is here.
This implementation uses a binary heap along with some helper data structures (pq, qp, and keys). The push, pop (delete-the-minimum), delete, and update operations take Θ(log n) time in the worst case, where n is the number of elements in the priority queue.
class INDEXED_MINIMUM_PRIORITY_QUEUE:
def __init__(self):
self.qp = {}
self.pq = []
self.keys = {}
def __exch(self, i, j):
self.pq[i], self.pq[j] = self.pq[j], self.pq[i]
self.qp[self.pq[i]] = i
self.qp[self.pq[j]] = j
def __greater(self, i, j):
return self.keys[self.pq[i]] > self.keys[self.pq[j]]
def __left_child_index(self, k):
return 2 * k + 1
def __right_child_index(self, k):
return 2 * k + 2
def __parent_index(self, k):
return (k - 1) // 2
def __sink(self, k):
while self.__left_child_index(k) < self.len():
j = self.__left_child_index(k)
if self.__right_child_index(k) < self.len() and self.__greater(j, j + 1): j = self.__right_child_index(k)
if not self.__greater(k, j): break
self.__exch(k, j)
k = j
def __swim(self, k):
while (k and self.keys[self.pq[self.__parent_index(k)]] > self.keys[self.pq[k]]):
self.__exch(k, self.__parent_index(k))
k = self.__parent_index(k)
def __str__(self):
return str(self.keys)
def len(self):
return len(self.pq)
def contains(self, key):
return key in self.keys
def push(self, key, val):
if self.contains(key):
self.update(key, val)
else:
self.qp[key] = self.len()
self.keys[key] = val
self.pq.append(key)
self.__swim(self.len() - 1)
def get_val(self, key):
if not self.contains(key): raise KeyError
return self.keys[key]
def top(self):
if self.is_empty(): raise IndexError
return self.pq[0], self.get_val(self.pq[0])
def pop(self):
key, val = self.top()
self.delete(key)
return key, val
def delete(self, key):
if not self.contains(key): raise KeyError
index = self.qp[key]
self.__exch(index, self.len() - 1)
self.__swim(index)
self.pq.pop()
self.__sink(index)
self.keys.pop(key)
self.qp.pop(key)
def update(self, key, val):
if not self.contains(key): raise KeyError
self.keys[key] = val
self.__swim(self.qp[key])
self.__sink(self.qp[key])
def is_empty(self):
return self.len() == 0
import random
import string
pq = INDEXED_MINIMUM_PRIORITY_QUEUE()
test_dict = {}
def validate():
assert(pq.len() == len(test_dict))
for key,val in test_dict.items():
try:
assert(pq.contains(key))
val_pq = pq.get_val(key)
assert(val == val_pq)
except:
print(pq)
print(test_dict)
raise
for i in range(1000000):
match random.randint(0, 10):
case 0:
if pq.is_empty(): continue
match random.randint(0, 2):
case 0:
key_pq, val_pq = pq.pop()
test_dict.pop(key_pq)
case 1:
key, val = random.choice(list(test_dict)), random.randint(1, 999)
pq.update(key, val)
test_dict[key] = val
case 2:
key = random.choice(list(test_dict))
pq.delete(key)
test_dict.pop(key)
case _:
key, val = random.choice(string.ascii_letters), random.randint(1, 999)
pq.push(key, val)
test_dict[key] = val
validate()