Skip to content

Heaps & Priority Queues

Questions to ask when determining whether a heap fits your problem
  • Is tracking the max/min a core issue of the problem? If so, a heap is a good bet as it allows O(1) lookups and O(log n) insertions of max/min
  • Do you have to keep track of a median value? If so, two heaps can be used to hold the lower and upper sets.

[! SUCCESS] Working with heaps

  • Heaps are particularly useful when all you care about is the largest or small element(s).
  • k largest or k smallest type problems are well-suited for heaps.

Relevant problems

  • Stock Price Fluctuation. Implement a class that updates, and reports the max/min of stock data streamed out of order. Use a heap to keep track of max and min values.
  • Find Median from Data Stream. Use two heaps, one for items at or below the median, the other for items above the median. Remember to rebalance correctly.
  • K largest or K smallest. Use a min or max heap and potentially write a custom comparator, or add negatives and use tuples-entries in the heap for keeping track of magnitude alongside another value.

Fundamentals

  • Special type of binary tree
  • Always a complete binary tree
  • Max-heap: The value of the parent is greater than or equal to the values at its children, e.g., root holds max key
  • Min-heap: The value of the parent is less than or equal to the values of its children, e.g., root holds min key
  • A priority queue is an abstract data structure, existing on a theoretical level. It needs a concrete data structure to be realized. A heap is often used as the concrete data structure underlying the implementation of a priority queue.
  • Max/min heaps support O(log n) insertions and deletions of the max/min item, and O(1) lookups of max/min values, respectively.
  • Searching for an arbitrary key has O(n) time complexity
  • Can be represented as arrays (because it is a complete binary tree) or BinaryNode type.

Python

  • import heapq is the standard heap module in the Python ecosystem.
  • This class only supports min-heap functionality, so to make a max-heap use negatives.
  • Just use the heap functions on a list directly, no need for a new data type.
  • Important methods on heapq are:
    • heapq.heapify(list) transforms a populated list into a heap in-place.
    • heapq.nlargest or heapq.nsmallest
    • heapq.heappush(heap, a) pushes element a onto heap
    • heapq.heappop(h) pops smallest element from heap
    • heapq.heappushpop(h, a) pushes a onto heap and then pops smallest element
    • o = h[0] peeks smallest element without removing

C++

C++ Tip

When in doubt, use a priority_queue.

  • Use make_heap to convert an existing vector into a heap, in place. It has less restrictions, and it therefore more flexible but less safe.
  • The priority_queue wraps the make_heap call, creating its own data container, typically a vector, within.

To create a priority queue with a custom comparator

cpp
auto comp = [&](int a, int b){ return map[a] < map[b]; };
std::priority_queue<int, vector<int>, decltype(comp)> heap(comp);

// Then use a loop with
heap.push(item) // Adds item
heap.top()      // Reads top item
heap.pop()      // Removes top item
auto comp = [&](int a, int b){ return map[a] < map[b]; };
std::priority_queue<int, vector<int>, decltype(comp)> heap(comp);

// Then use a loop with
heap.push(item) // Adds item
heap.top()      // Reads top item
heap.pop()      // Removes top item

Rust

The standard library heap implementation is exposed as BinaryHeap. It is a max-heap so if you need a min-heap, you have to multiply entries by -1.

rust
let mut heap = BinaryHeap::with_capacity(k as usize + 1);
// Multiply each new entry by -1 for a min-heap
vec.into_iter().for_each(|x| {
	heap.push((-1 * x.1, x.0));
}
let mut heap = BinaryHeap::with_capacity(k as usize + 1);
// Multiply each new entry by -1 for a min-heap
vec.into_iter().for_each(|x| {
	heap.push((-1 * x.1, x.0));
}

Problems

10.1 merge sorted arrays

Tough problem, but foundational. Do review. Important logic about how push and pop actually mutate the heap. The core solution is to make a heap from all the root elements of the sub-arrays and one-by-one add the min one to the output result, replacing it with the new element from that array when it's associated min is popped.

Lessons Learned
  • When you push an entry on the heap, or when you pop, it take O(log n) time because it has to move the subtrees accordingly.
  • You don't have to push values in the correct order, that's the whole point of the heap, it adds the value to the correct location efficiently.
  • There are typically several correct ways to order items in a heap

10.3 Sort an almost sorted array

Good work! Solved this problem in ~ 20 minutes perfectly exactly as the authors did. You also correctly evaluated the time and space complexity! Strong all around.

Lessons Learned
  • heapify converts a list into a heap in-place, it doesn't return a heap.
  • Otherwise straightforward brute force solution.
  • heapreplace(a, x) returns the smallest value originally in a regardless of the value of x, while, as the name suggests, heappushpop(a, x) pushes x onto a before popping the smallest value." - stackoverflow

10.4 Compute k closest stars

This was trivial given the heapq.nsmallest(n, heap) function. The key take away the authors are trying to get at is if you cannot store all of the star's coordinates in RAM, then you have to be smarter about how you yield data into the heap being searched over. Solution, which isn't super difficult: Following very similar logic to the problem before, create a heap of k elements, loop over the input, push the new element on and pop from the heap.

Lessons Learned
  • It is important to recall that Python's heapq doesn't implement a max-heap, so you have to take care to store and compare negative values in the heap (thus maxing it a max-heap against negative values) then multiply them by -1 before returning. To save time for space you could make the heap items tuples such that you have a negative distance assocaited with the actual target type. For this, ensure you implement def __lt__(self, other) for less than comparison.

LC 347 Top k frequent elements

This is very similar to the problem above, but when doing it in C++, you cannot rely on the nlargest function on heapq. Therefore, the generic solution is

  1. Create a hash map of occurrences -> O(N)
  2. Create a max-heap from that hash map, making a custom comparator if required -> O((N-k) log k)
  3. Pop $k$ elements from the heap into a vector -> O(k log k)

C++ Solution

cpp
class Solution {
public:
    vector<int> topKFrequent(vector<int>& nums, int k) {
        std::map<int, int> map{};
        for (auto item : nums)
            map[item] += 1;
            

        auto comp = [&](int a, int b){ return map[a] < map[b]; };
        std::priority_queue<int, vector<int>, decltype(comp)> heap(comp);

        for (auto pair : map) {
            heap.push(pair.first);
        }

        vector<int> output;
        for (auto i = 0; i < k; i++) {
            output.push_back(heap.top());
            heap.pop();
        }
        return output;
    }
};
class Solution {
public:
    vector<int> topKFrequent(vector<int>& nums, int k) {
        std::map<int, int> map{};
        for (auto item : nums)
            map[item] += 1;
            

        auto comp = [&](int a, int b){ return map[a] < map[b]; };
        std::priority_queue<int, vector<int>, decltype(comp)> heap(comp);

        for (auto pair : map) {
            heap.push(pair.first);
        }

        vector<int> output;
        for (auto i = 0; i < k; i++) {
            output.push_back(heap.top());
            heap.pop();
        }
        return output;
    }
};

Rust Solution

rust
use std::collections as c;

impl Solution {
    pub fn top_k_frequent(nums: Vec<i32>, k: i32) -> Vec<i32> {
        // Create hash map of occurances
        let mut map = c::HashMap::new();
        for num in nums.iter() {
            match map.get(num) {
                None => map.insert(num, 1),
                Some(curr) => map.insert(num, curr + 1)
            };
        }

        // Create a heap from the hash map, limiting to k entries
        let mut heap = c::BinaryHeap::new();
        for x in map {
            heap.push((-1 * x.1, x.0));

            if heap.len() > k as usize {
                heap.pop().unwrap();
            }
        }

        // Return the top k elements from the heap
        let vec: Vec<i32> = heap.into_iter().map(|x| x.1.to_owned()).collect();
        vec
    }
}
use std::collections as c;

impl Solution {
    pub fn top_k_frequent(nums: Vec<i32>, k: i32) -> Vec<i32> {
        // Create hash map of occurances
        let mut map = c::HashMap::new();
        for num in nums.iter() {
            match map.get(num) {
                None => map.insert(num, 1),
                Some(curr) => map.insert(num, curr + 1)
            };
        }

        // Create a heap from the hash map, limiting to k entries
        let mut heap = c::BinaryHeap::new();
        for x in map {
            heap.push((-1 * x.1, x.0));

            if heap.len() > k as usize {
                heap.pop().unwrap();
            }
        }

        // Return the top k elements from the heap
        let vec: Vec<i32> = heap.into_iter().map(|x| x.1.to_owned()).collect();
        vec
    }
}

LC 973 K Closest Point to Origin

This problem also have a sorting solution, see LC 973 K Closest Points to Origin.

Solution: Create a result list and a namedtuple to store values in the result with both and distance and point. Next, loop over the points, calculate the distance and push them into the result list-heap. Since heapq uses a min-heap by default this is perfect. Finally return only the top k points using list initializer syntax.

python
from heapq import *

class Solution:
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
        distance_point = namedtuple('D', ('distance', 'point'))
        result = []

        for p in points:
            x, y = p
            distance = ( x**2 + y**2 )**(1/2)
            heappush(result, distance_point(distance, p))

        return [heappop(result).point for _ in range(k)]
from heapq import *

class Solution:
    def kClosest(self, points: List[List[int]], k: int) -> List[List[int]]:
        distance_point = namedtuple('D', ('distance', 'point'))
        result = []

        for p in points:
            x, y = p
            distance = ( x**2 + y**2 )**(1/2)
            heappush(result, distance_point(distance, p))

        return [heappop(result).point for _ in range(k)]

Optimization: Only keep k elements in the heap.