Table of Contents
- Introduction
- Heap Fundamentals
- Types of Heaps
- Heap Representation
- Python heapq Module
- Min-Heap Implementation from Scratch
- Max-Heap Implementation from Scratch
- Heap Operations and Time Complexity
- Priority Queue using Heap
- Common Heap Patterns
- Classic Heap Problems
- Advanced Topics
- Best Practices and Tips
Introduction
A Heap is a specialized tree-based data structure that satisfies the heap property. It is a complete binary tree where each parent node follows a specific ordering relationship with its children. Heaps are most commonly used to implement Priority Queues and efficient sorting algorithms.
Real-World Analogies
- Hospital Triage: Patients are treated based on severity (priority), not arrival order
- Task Scheduler: Operating systems schedule processes by priority
- Event-Driven Simulation: Events processed in chronological order
- Airline Boarding: First class boards before economy (priority-based)
- Dijkstra’s GPS Navigation: Always processes the closest unvisited node first
Key Characteristics
- Complete Binary Tree: All levels are filled except possibly the last, which fills left to right
- Heap Property: Parent always has a specific ordering relationship with its children
- Efficient Priority Access: O(1) access to the max or min element
- Array-Based: Efficiently stored in a flat array (no pointers needed)
- Not Fully Sorted: Only guarantees root is min/max, not full ordering
Why Use Heaps?
| Problem | Without Heap | With Heap |
|---|---|---|
| Find min/max repeatedly | O(n) each | O(1) + O(log n) |
| K largest/smallest elements | O(n log n) | O(n log k) |
| Priority Queue operations | O(n) | O(log n) |
| Merge K sorted lists | O(nk) | O(n log k) |
| Median of data stream | O(n) | O(log n) |
Heap Fundamentals
Heap Property
MIN-HEAP: Parent ≤ Children MAX-HEAP: Parent ≥ Children
1 9
/ \ / \
3 2 7 8
/ \ / \ / \ / \
6 5 4 2 4 5 1PythonComplete Binary Tree Rules
✅ Valid Complete Binary Tree ❌ Invalid (gap in last level)
1 1
/ \ / \
3 2 3 2
/ \ / / / \
6 5 4 6 4 5PythonArray Representation
A heap can be stored in an array using this index mapping:
Heap Array: [1, 3, 2, 6, 5, 4]
Index: 0 1 2 3 4 5
For node at index i:
Left Child = 2*i + 1
Right Child = 2*i + 2
Parent = (i - 1) // 2Python# Visual mapping for heap at index i:
def parent(i): return (i - 1) // 2
def left_child(i): return 2 * i + 1
def right_child(i): return 2 * i + 2
# Example: Array [10, 20, 15, 40, 50, 100, 25]
#
# 10 ← index 0
# / \
# 20 15 ← index 1, 2
# / \ / \
# 40 50 100 25 ← index 3, 4, 5, 6PythonTypes of Heaps
1. Min-Heap
The smallest element is always at the root. Every parent is less than or equal to its children.
1
/ \
3 2
/ \ / \
6 5 4 7
Min element (1) is always at the top ← O(1) accessPythonUse Cases:
- Dijkstra’s shortest path
- Finding K smallest elements
- Scheduling tasks with minimum priority first
2. Max-Heap
The largest element is always at the root. Every parent is greater than or equal to its children.
9
/ \
7 8
/ \ / \
2 4 5 1
Max element (9) is always at the top ← O(1) accessPythonUse Cases:
- Heap Sort
- Finding K largest elements
- Scheduling tasks with maximum priority first
3. Min-Max Heap
Alternating levels are min-levels and max-levels. Supports both O(1) min and max access.
4. Fibonacci Heap
Advanced heap with better amortized complexity for decrease-key and merge operations.
| Heap Type | Insert | Extract Min/Max | Decrease Key | Find Min/Max |
|---|---|---|---|---|
| Binary Heap | O(log n) | O(log n) | O(log n) | O(1) |
| Fibonacci Heap | O(1)* | O(log n)* | O(1)* | O(1) |
*Amortized
Heap vs Other Data Structures
| Operation | Array | BST (balanced) | Heap |
|---|---|---|---|
| Insert | O(1) | O(log n) | O(log n) |
| Delete min | O(n) | O(log n) | O(log n) |
| Find min | O(n) | O(log n) | O(1) |
| Search | O(n) | O(log n) | O(n) |
| Build from list | O(n) | O(n log n) | O(n) |
Heap Representation
Index Relationships
class HeapIndexHelper:
"""Demonstrates heap array index relationships"""
@staticmethod
def parent(i):
"""Index of parent node"""
return (i - 1) // 2
@staticmethod
def left(i):
"""Index of left child"""
return 2 * i + 1
@staticmethod
def right(i):
"""Index of right child"""
return 2 * i + 2
@staticmethod
def is_root(i):
return i == 0
@staticmethod
def has_left(i, size):
return 2 * i + 1 < size
@staticmethod
def has_right(i, size):
return 2 * i + 2 < size
# Example visualization
def visualize_heap_array(arr):
"""Show both array and tree representation"""
print("Array:", arr)
print("\nIndex relationships:")
for i in range(len(arr)):
parent = (i - 1) // 2 if i > 0 else None
left = 2 * i + 1 if 2 * i + 1 < len(arr) else None
right = 2 * i + 2 if 2 * i + 2 < len(arr) else None
print(f" index {i} (value={arr[i]}): parent={parent}, left={left}, right={right}")
heap = [1, 3, 2, 6, 5, 4]
visualize_heap_array(heap)PythonOutput:
Array: [1, 3, 2, 6, 5, 4]
Index relationships:
index 0 (value=1): parent=None, left=1, right=2
index 1 (value=3): parent=0, left=3, right=4
index 2 (value=2): parent=0, left=5, right=6
index 3 (value=6): parent=1, left=7, right=8
index 4 (value=5): parent=1, left=9, right=10
index 5 (value=4): parent=2, left=11, right=12PythonPython heapq Module
Python’s built-in heapq module provides a min-heap implementation on top of regular Python lists.
Basic Operations
import heapq
# heapq works on regular lists
heap = []
# Push elements - O(log n)
heapq.heappush(heap, 10)
heapq.heappush(heap, 5)
heapq.heappush(heap, 20)
heapq.heappush(heap, 1)
heapq.heappush(heap, 8)
print(heap) # [1, 5, 20, 10, 8] ← min-heap array
print(heap[0]) # 1 ← minimum element always at index 0
# Pop minimum element - O(log n)
min_val = heapq.heappop(heap)
print(min_val) # 1
print(heap) # [5, 8, 20, 10]
# Peek at minimum without removing - O(1)
print(heap[0]) # 5
# Push and pop atomically (more efficient than separate calls)
result = heapq.heappushpop(heap, 3) # Push 3, then pop and return min
print(result) # 3 (3 was less than min 5, so 3 is returned)
result = heapq.heapreplace(heap, 3) # Pop min, then push 3
print(result) # 5PythonBuilding a Heap from a List
import heapq
# Method 1: heapify - converts list to heap IN PLACE - O(n)
nums = [64, 34, 25, 12, 22, 11, 90]
heapq.heapify(nums)
print(nums) # [11, 12, 25, 34, 22, 64, 90]
# Method 2: Push one by one - O(n log n)
heap = []
for num in [64, 34, 25, 12, 22, 11, 90]:
heapq.heappush(heap, num)
print(heap) # [11, 12, 25, 64, 22, 34, 90]
# Note: heapify is preferred - it's O(n) vs O(n log n)PythonUseful heapq Functions
import heapq
nums = [64, 34, 25, 12, 22, 11, 90, 5, 43]
# nlargest - returns k largest elements - O(n log k)
print(heapq.nlargest(3, nums)) # [90, 64, 43]
# nsmallest - returns k smallest elements - O(n log k)
print(heapq.nsmallest(3, nums)) # [5, 11, 12]
# With key function
words = ["banana", "apple", "cherry", "date", "elderberry"]
print(heapq.nsmallest(2, words, key=len)) # ['date', 'apple'] ← by length
print(heapq.nlargest(2, words, key=len)) # ['elderberry', 'banana']
# Merge multiple sorted iterables - O(n log k)
sorted1 = [1, 4, 7]
sorted2 = [2, 5, 8]
sorted3 = [3, 6, 9]
merged = list(heapq.merge(sorted1, sorted2, sorted3))
print(merged) # [1, 2, 3, 4, 5, 6, 7, 8, 9]PythonMax-Heap Using heapq (Negation Trick)
import heapq
# Python's heapq only provides min-heap
# For max-heap: negate all values
max_heap = []
for num in [10, 30, 20, 5, 50]:
heapq.heappush(max_heap, -num) # Push negated values
print(max_heap) # [-50, -10, -20, -5, -30]
print(-max_heap[0]) # 50 ← actual max is negated root
# Pop maximum
max_val = -heapq.heappop(max_heap)
print(max_val) # 50
# For tuples with priority
tasks = [("low", 3), ("critical", 1), ("medium", 2)]
heap = []
for task, priority in tasks:
heapq.heappush(heap, (-priority, task)) # Negate priority for max-heap
print(heapq.heappop(heap)) # (-3, 'low') ← highest priority (3) firstPythonHeap with Custom Objects
import heapq
# Method 1: Use tuples (priority, item)
heap = []
heapq.heappush(heap, (1, "wash dishes"))
heapq.heappush(heap, (3, "watch TV"))
heapq.heappush(heap, (2, "do homework"))
while heap:
priority, task = heapq.heappop(heap)
print(f"Priority {priority}: {task}")
# Output:
# Priority 1: wash dishes
# Priority 2: do homework
# Priority 3: watch TV
# Method 2: Use dataclass with __lt__
from dataclasses import dataclass, field
@dataclass(order=True)
class Task:
priority: int
name: str = field(compare=False)
def __repr__(self):
return f"Task({self.name!r}, priority={self.priority})"
heap = []
heapq.heappush(heap, Task(3, "Low priority"))
heapq.heappush(heap, Task(1, "Urgent"))
heapq.heappush(heap, Task(2, "Normal"))
while heap:
print(heapq.heappop(heap))
# Output:
# Task('Urgent', priority=1)
# Task('Normal', priority=2)
# Task('Low priority', priority=3)PythonMin-Heap Implementation from Scratch
class MinHeap:
"""
Min-Heap implementation from scratch.
The minimum element is always at the root (index 0).
Heap Property: parent <= both children
"""
def __init__(self):
self.heap = []
def __len__(self):
return len(self.heap)
def __repr__(self):
return f"MinHeap({self.heap})"
def is_empty(self):
return len(self.heap) == 0
def peek(self):
"""Return minimum element without removing - O(1)"""
if self.is_empty():
raise IndexError("Heap is empty")
return self.heap[0]
def _parent(self, i):
return (i - 1) // 2
def _left(self, i):
return 2 * i + 1
def _right(self, i):
return 2 * i + 2
def _swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def _sift_up(self, i):
"""
Bubble element up to restore heap property after insertion.
Compare with parent, swap if smaller. Repeat until root or heap property satisfied.
Time: O(log n)
"""
while i > 0:
parent = self._parent(i)
if self.heap[i] < self.heap[parent]:
self._swap(i, parent)
i = parent
else:
break # Heap property satisfied
def _sift_down(self, i):
"""
Push element down to restore heap property after extraction.
Compare with smaller child, swap if larger. Repeat until leaf or heap property satisfied.
Time: O(log n)
"""
size = len(self.heap)
while True:
smallest = i
left = self._left(i)
right = self._right(i)
if left < size and self.heap[left] < self.heap[smallest]:
smallest = left
if right < size and self.heap[right] < self.heap[smallest]:
smallest = right
if smallest != i:
self._swap(i, smallest)
i = smallest
else:
break # Heap property satisfied
def push(self, value):
"""
Insert a new value into the heap - O(log n)
1. Add to end of array
2. Sift up to restore heap property
"""
self.heap.append(value)
self._sift_up(len(self.heap) - 1)
def pop(self):
"""
Remove and return the minimum element - O(log n)
1. Swap root with last element
2. Remove last element (was root)
3. Sift down to restore heap property
"""
if self.is_empty():
raise IndexError("Heap is empty")
# Swap root (minimum) with last element
self._swap(0, len(self.heap) - 1)
min_val = self.heap.pop() # Remove from end (original root)
if self.heap:
self._sift_down(0) # Restore heap property
return min_val
def push_pop(self, value):
"""Push value then pop min - O(log n). More efficient than separate calls."""
if self.heap and self.heap[0] < value:
value, self.heap[0] = self.heap[0], value
self._sift_down(0)
return value
def replace(self, value):
"""Pop min then push value - O(log n)."""
if self.is_empty():
raise IndexError("Heap is empty")
min_val = self.heap[0]
self.heap[0] = value
self._sift_down(0)
return min_val
def heapify(self, iterable):
"""
Build heap from any iterable - O(n)
Uses Floyd's algorithm: sift down from last internal node to root.
More efficient than inserting one-by-one (O(n) vs O(n log n))
"""
self.heap = list(iterable)
# Start from last internal node and sift down each node
# Last internal node index = (n // 2) - 1
for i in range(len(self.heap) // 2 - 1, -1, -1):
self._sift_down(i)
def size(self):
return len(self.heap)
def get_sorted(self):
"""Extract all elements in sorted order - O(n log n)"""
import copy
heap_copy = MinHeap()
heap_copy.heap = copy.deepcopy(self.heap)
result = []
while not heap_copy.is_empty():
result.append(heap_copy.pop())
return result
# Usage Example
min_heap = MinHeap()
for val in [10, 5, 20, 1, 8, 3]:
min_heap.push(val)
print(min_heap) # MinHeap([1, 5, 3, 10, 8, 20])
print(min_heap.peek()) # 1
print(min_heap.pop()) # 1
print(min_heap.pop()) # 3
print(min_heap) # MinHeap([5, 8, 20, 10])
# Build from list using heapify
h = MinHeap()
h.heapify([64, 34, 25, 12, 22, 11, 90])
print(h) # MinHeap([11, 12, 25, 34, 22, 64, 90])
print(h.get_sorted()) # [11, 12, 22, 25, 34, 64, 90]PythonMax-Heap Implementation from Scratch
class MaxHeap:
"""
Max-Heap implementation from scratch.
The maximum element is always at the root (index 0).
Heap Property: parent >= both children
"""
def __init__(self):
self.heap = []
def __len__(self):
return len(self.heap)
def __repr__(self):
return f"MaxHeap({self.heap})"
def is_empty(self):
return len(self.heap) == 0
def peek(self):
"""Return maximum element without removing - O(1)"""
if self.is_empty():
raise IndexError("Heap is empty")
return self.heap[0]
def _parent(self, i):
return (i - 1) // 2
def _left(self, i):
return 2 * i + 1
def _right(self, i):
return 2 * i + 2
def _swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
def _sift_up(self, i):
"""Bubble element up - swap with parent while larger than parent"""
while i > 0:
parent = self._parent(i)
if self.heap[i] > self.heap[parent]: # Only difference from MinHeap
self._swap(i, parent)
i = parent
else:
break
def _sift_down(self, i):
"""Push element down - swap with largest child"""
size = len(self.heap)
while True:
largest = i
left = self._left(i)
right = self._right(i)
if left < size and self.heap[left] > self.heap[largest]: # Only difference
largest = left
if right < size and self.heap[right] > self.heap[largest]: # Only difference
largest = right
if largest != i:
self._swap(i, largest)
i = largest
else:
break
def push(self, value):
"""Insert value - O(log n)"""
self.heap.append(value)
self._sift_up(len(self.heap) - 1)
def pop(self):
"""Remove and return maximum - O(log n)"""
if self.is_empty():
raise IndexError("Heap is empty")
self._swap(0, len(self.heap) - 1)
max_val = self.heap.pop()
if self.heap:
self._sift_down(0)
return max_val
def heapify(self, iterable):
"""Build max-heap from iterable - O(n)"""
self.heap = list(iterable)
for i in range(len(self.heap) // 2 - 1, -1, -1):
self._sift_down(i)
def size(self):
return len(self.heap)
def heap_sort(self, arr):
"""
Sort array in ascending order using max-heap - O(n log n)
1. Build max-heap from array
2. Repeatedly extract max and place at end
"""
self.heapify(arr)
result = []
while not self.is_empty():
result.append(self.pop())
return result[::-1] # Reverse since we extracted from largest to smallest
# Usage Example
max_heap = MaxHeap()
for val in [10, 5, 20, 1, 8, 3]:
max_heap.push(val)
print(max_heap) # MaxHeap([20, 8, 10, 1, 5, 3])
print(max_heap.peek()) # 20
print(max_heap.pop()) # 20
print(max_heap.pop()) # 10
print(max_heap) # MaxHeap([8, 5, 3, 1])
# Heap sort
h = MaxHeap()
print(h.heap_sort([64, 34, 25, 12, 22, 11, 90])) # [11, 12, 22, 25, 34, 64, 90]PythonHeap Operations and Time Complexity
Time Complexity Summary
| Operation | Time Complexity | Notes |
|---|---|---|
| peek / find-min | O(1) | Root is always min/max |
| push / insert | O(log n) | Sift up from leaf to root |
| pop / extract | O(log n) | Sift down from root to leaf |
| heapify | O(n) | Floyd’s algorithm (better than O(n log n)) |
| delete (any) | O(log n) | Update then sift up/down |
| decrease-key | O(log n) | Update then sift up |
| increase-key | O(log n) | Update then sift down |
| merge | O(n) | Build new heap from combined arrays |
| search | O(n) | No ordering guarantee beyond root |
| nlargest(k) | O(n log k) | heapq.nlargest() |
| nsmallest(k) | O(n log k) | heapq.nsmallest() |
Space Complexity
- All operations: O(n) for storing n elements
- Heapify: O(1) extra space (in-place)
- Heap sort: O(1) extra space (in-place)
Why heapify is O(n) — Floyd’s Algorithm Explained
Insight: Most nodes are near the bottom and require little sift-down work.
For a heap of n elements:
- Nodes at height h: ≤ ⌈n / 2^(h+1)⌉
- Sift-down cost at height h: O(h)
Total work = Σ ⌈n / 2^(h+1)⌉ × O(h)
≈ n × Σ h / 2^h
= n × 2 (geometric series converges to 2)
= O(n)
Comparison:
Build by inserting one by one: O(n log n)
heapify (Floyd's algorithm): O(n) ← always prefer this!Pythonimport heapq
import time
def build_by_insertion(nums):
h = []
for num in nums:
heapq.heappush(h, num)
return h
def build_by_heapify(nums):
h = nums[:]
heapq.heapify(h)
return h
# Both produce valid min-heaps, but heapify is faster for large inputs
nums = list(range(100000, 0, -1)) # Worst case: reverse sorted
start = time.perf_counter()
build_by_insertion(nums)
t1 = time.perf_counter() - start
start = time.perf_counter()
build_by_heapify(nums)
t2 = time.perf_counter() - start
print(f"Insert one-by-one: {t1:.4f}s")
print(f"heapify: {t2:.4f}s")
print(f"heapify is ~{t1/t2:.1f}x faster")PythonPriority Queue using Heap
A Priority Queue is the most common application of heaps. Elements are dequeued in order of priority, not insertion order.
Using heapq
import heapq
class PriorityQueue:
"""
Min-Priority Queue: lower number = higher priority
Use negation for max-priority behavior.
"""
def __init__(self):
self._heap = []
self._index = 0 # Tiebreaker for equal priorities (FIFO order)
def push(self, item, priority):
"""Add item with given priority - O(log n)"""
# (priority, index, item) ensures stable ordering when priority ties
heapq.heappush(self._heap, (priority, self._index, item))
self._index += 1
def pop(self):
"""Remove and return highest priority item - O(log n)"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = heapq.heappop(self._heap)
return item, priority
def peek(self):
"""View highest priority item without removing - O(1)"""
if self.is_empty():
raise IndexError("Priority queue is empty")
priority, _, item = self._heap[0]
return item, priority
def is_empty(self):
return len(self._heap) == 0
def size(self):
return len(self._heap)
def __repr__(self):
items = [(item, p) for p, _, item in sorted(self._heap)]
return f"PriorityQueue({items})"
# Example: Hospital Triage
pq = PriorityQueue()
pq.push("Alice - broken arm", priority=3)
pq.push("Bob - cardiac arrest", priority=1) # Highest priority (1 = critical)
pq.push("Carol - minor cut", priority=5)
pq.push("Dave - chest pain", priority=2)
print("Treating patients in priority order:")
while not pq.is_empty():
patient, priority = pq.pop()
print(f" [Priority {priority}] {patient}")
# Output:
# [Priority 1] Bob - cardiac arrest
# [Priority 2] Dave - chest pain
# [Priority 3] Alice - broken arm
# [Priority 5] Carol - minor cutPythonUsing queue.PriorityQueue (Thread-Safe)
from queue import PriorityQueue
# Built-in thread-safe priority queue
pq = PriorityQueue()
pq.put((1, "critical task"))
pq.put((3, "low priority task"))
pq.put((2, "medium task"))
while not pq.empty():
priority, task = pq.get()
print(f"Priority {priority}: {task}")
# Output:
# Priority 1: critical task
# Priority 2: medium task
# Priority 3: low priority taskPythonCommon Heap Patterns
Pattern 1: K Largest / K Smallest Elements
import heapq
def k_largest(nums, k):
"""
Find k largest elements using min-heap of size k.
Time: O(n log k) Space: O(k)
Strategy: Maintain a min-heap of size k.
If current element > heap root (smallest in top-k), replace root.
"""
min_heap = []
for num in nums:
heapq.heappush(min_heap, num)
if len(min_heap) > k:
heapq.heappop(min_heap) # Remove smallest — keeps k largest
return sorted(min_heap, reverse=True)
def k_smallest(nums, k):
"""
Find k smallest elements using max-heap of size k.
Time: O(n log k) Space: O(k)
"""
max_heap = []
for num in nums:
heapq.heappush(max_heap, -num)
if len(max_heap) > k:
heapq.heappop(max_heap) # Remove largest — keeps k smallest
return sorted(-x for x in max_heap)
# Test
nums = [3, 1, 5, 12, 2, 11, 8, 7]
print(k_largest(nums, 3)) # [12, 11, 8]
print(k_smallest(nums, 3)) # [1, 2, 3]
# Python shortcut using heapq helpers
print(heapq.nlargest(3, nums)) # [12, 11, 8]
print(heapq.nsmallest(3, nums)) # [1, 2, 3]PythonPattern 2: Kth Largest / Kth Smallest
import heapq
def kth_largest(nums, k):
"""
Find kth largest element.
Time: O(n log k) Space: O(k)
Maintain a min-heap of size k; root is kth largest.
"""
min_heap = []
for num in nums:
heapq.heappush(min_heap, num)
if len(min_heap) > k:
heapq.heappop(min_heap)
return min_heap[0] # Root of min-heap is kth largest
def kth_smallest(nums, k):
"""
Find kth smallest element.
Time: O(n log k) Space: O(k)
Maintain a max-heap of size k; root is kth smallest.
"""
max_heap = []
for num in nums:
heapq.heappush(max_heap, -num)
if len(max_heap) > k:
heapq.heappop(max_heap)
return -max_heap[0] # Negate back to get actual value
nums = [3, 2, 1, 5, 6, 4]
print(kth_largest(nums, 2)) # 5 (2nd largest)
print(kth_smallest(nums, 2)) # 2 (2nd smallest)PythonPattern 3: Merge K Sorted Lists
import heapq
def merge_k_sorted_lists(lists):
"""
Merge k sorted lists into one sorted list.
Time: O(n log k) where n = total elements, k = number of lists
Space: O(k) for the heap
Algorithm:
1. Push first element from each list into min-heap
2. Pop min, add to result
3. Push next element from that list into heap
"""
min_heap = []
# Initialize heap with first element from each list
for i, lst in enumerate(lists):
if lst:
heapq.heappush(min_heap, (lst[0], i, 0)) # (value, list_index, element_index)
result = []
while min_heap:
val, list_idx, elem_idx = heapq.heappop(min_heap)
result.append(val)
# Push next element from the same list
next_idx = elem_idx + 1
if next_idx < len(lists[list_idx]):
next_val = lists[list_idx][next_idx]
heapq.heappush(min_heap, (next_val, list_idx, next_idx))
return result
# Test
lists = [[1, 4, 7], [2, 5, 8], [3, 6, 9], [0, 10, 11]]
print(merge_k_sorted_lists(lists)) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]PythonPattern 4: Two-Heap Pattern (Median of Data Stream)
import heapq
class MedianFinder:
"""
Find median from a data stream efficiently.
Strategy: Use two heaps:
- max_heap (left half): stores smaller half, root = max of left half
- min_heap (right half): stores larger half, root = min of right half
Invariant: len(max_heap) == len(min_heap) OR len(max_heap) == len(min_heap) + 1
max_heap root <= min_heap root
Time: O(log n) per add, O(1) for get_median
Space: O(n)
"""
def __init__(self):
self.max_heap = [] # Left half (negated for max behavior)
self.min_heap = [] # Right half (standard min-heap)
def add_num(self, num):
# Step 1: Push to max_heap (left half)
heapq.heappush(self.max_heap, -num)
# Step 2: Balance - ensure max of left <= min of right
if self.max_heap and self.min_heap \
and (-self.max_heap[0]) > self.min_heap[0]:
val = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, val)
# Step 3: Size balance - max_heap can have at most 1 extra
if len(self.max_heap) > len(self.min_heap) + 1:
val = -heapq.heappop(self.max_heap)
heapq.heappush(self.min_heap, val)
elif len(self.min_heap) > len(self.max_heap):
val = heapq.heappop(self.min_heap)
heapq.heappush(self.max_heap, -val)
def get_median(self):
if len(self.max_heap) > len(self.min_heap):
return float(-self.max_heap[0])
return (-self.max_heap[0] + self.min_heap[0]) / 2.0
# Test
mf = MedianFinder()
stream = [5, 15, 1, 3, 8, 10, 2]
for num in stream:
mf.add_num(num)
print(f"Added {num:2d} → median = {mf.get_median()}")
# Added 5 → median = 5.0
# Added 15 → median = 10.0
# Added 1 → median = 5.0
# Added 3 → median = 4.0
# Added 8 → median = 5.0
# Added 10 → median = 6.5
# Added 2 → median = 5.0PythonPattern 5: Heap Sort
import heapq
def heap_sort_ascending(arr):
"""
Heap sort using Python's heapq (min-heap).
Time: O(n log n) Space: O(n)
"""
heap = arr[:]
heapq.heapify(heap)
return [heapq.heappop(heap) for _ in range(len(heap))]
def heap_sort_inplace(arr):
"""
In-place heap sort using max-heap.
Time: O(n log n) Space: O(1)
Algorithm:
1. Build max-heap from array ← O(n)
2. Repeatedly: swap root with last, reduce size, sift down ← O(n log n)
"""
n = len(arr)
def sift_down(arr, i, size):
"""Sift down within arr[0..size-1]"""
while True:
largest = i
left, right = 2 * i + 1, 2 * i + 2
if left < size and arr[left] > arr[largest]:
largest = left
if right < size and arr[right] > arr[largest]:
largest = right
if largest != i:
arr[i], arr[largest] = arr[largest], arr[i]
i = largest
else:
break
# Phase 1: Build max-heap - O(n)
for i in range(n // 2 - 1, -1, -1):
sift_down(arr, i, n)
# Phase 2: Extract elements one by one - O(n log n)
for end in range(n - 1, 0, -1):
arr[0], arr[end] = arr[end], arr[0] # Move current max to end
sift_down(arr, 0, end) # Restore heap on reduced array
return arr
# Test
arr = [64, 34, 25, 12, 22, 11, 90]
print(heap_sort_ascending(arr)) # [11, 12, 22, 25, 34, 64, 90]
print(heap_sort_inplace(arr[:])) # [11, 12, 22, 25, 34, 64, 90]PythonClassic Heap Problems
Problem 1: Top K Frequent Elements
import heapq
from collections import Counter
def top_k_frequent(nums, k):
"""
Return k most frequent elements.
Time: O(n log k) Space: O(n)
"""
# Count frequencies
count = Counter(nums) # O(n)
# Use min-heap of size k based on frequency
# (frequency, element) — min-heap pops lowest frequency
heap = []
for num, freq in count.items():
heapq.heappush(heap, (freq, num))
if len(heap) > k:
heapq.heappop(heap) # Remove least frequent
return [num for freq, num in heap]
# Test
print(top_k_frequent([1,1,1,2,2,3], 2)) # [1, 2]
print(top_k_frequent([1], 1)) # [1]PythonProblem 2: K Closest Points to Origin
import heapq
def k_closest(points, k):
"""
Find k closest points to origin (0,0).
Time: O(n log k) Space: O(k)
Distance = sqrt(x² + y²) but we can compare x² + y² directly
"""
max_heap = [] # Max-heap: keeps k smallest distances
for x, y in points:
dist_sq = x * x + y * y
heapq.heappush(max_heap, (-dist_sq, x, y)) # Negate for max-heap
if len(max_heap) > k:
heapq.heappop(max_heap) # Remove farthest in current top-k
return [[x, y] for _, x, y in max_heap]
# Test
points = [[1, 3], [-2, 2], [5, 8], [0, 1]]
print(k_closest(points, 2)) # [[-2, 2], [0, 1]] — closest 2 to originPythonProblem 3: Task Scheduler
import heapq
from collections import Counter
def least_interval(tasks, n):
"""
CPU Task Scheduler: find min intervals to finish all tasks
with cooldown of n intervals between same tasks.
Time: O(T log T) where T = number of unique tasks
"""
counts = Counter(tasks)
max_heap = [-cnt for cnt in counts.values()]
heapq.heapify(max_heap)
time = 0
import collections
cooldown = collections.deque() # (available_time, count)
while max_heap or cooldown:
time += 1
if max_heap:
cnt = 1 + heapq.heappop(max_heap) # Process task (subtract 1)
if cnt:
cooldown.append((time + n, cnt)) # Schedule when available again
if cooldown and cooldown[0][0] == time:
_, cnt = cooldown.popleft()
heapq.heappush(max_heap, cnt)
return time
# Test
print(least_interval(["A","A","A","B","B","B"], 2)) # 8
print(least_interval(["A","A","A","B","B","B"], 0)) # 6PythonProblem 4: Find Median from Data Stream
(Already covered in Pattern 4 — Two-Heap Pattern above)
Problem 5: Reorganize String (No Two Adjacent Same)
import heapq
def reorganize_string(s):
"""
Rearrange string so no two adjacent characters are same.
Return "" if not possible.
Time: O(n log k) where k = number of unique characters
"""
count = {}
for c in s:
count[c] = count.get(c, 0) + 1
max_heap = [(-freq, char) for char, freq in count.items()]
heapq.heapify(max_heap)
result = []
prev = None
while max_heap:
freq, char = heapq.heappop(max_heap)
if prev and prev[0] < 0: # Re-add previous char if still has count
heapq.heappush(max_heap, prev)
result.append(char)
prev = (freq + 1, char) # freq is negative, so +1 decreases count
if len(result) != len(s):
return "" # Not possible
return "".join(result)
# Test
print(reorganize_string("aab")) # "aba"
print(reorganize_string("aaab")) # "" (impossible)
print(reorganize_string("aabb")) # "abab" or "baba"PythonProblem 6: Ugly Numbers (II)
import heapq
def nth_ugly_number(n):
"""
Find nth ugly number. Ugly numbers have only prime factors 2, 3, 5.
Time: O(n log n) Space: O(n)
"""
min_heap = [1]
seen = {1}
ugly = 0
for _ in range(n):
ugly = heapq.heappop(min_heap)
for prime in [2, 3, 5]:
new_ugly = ugly * prime
if new_ugly not in seen:
seen.add(new_ugly)
heapq.heappush(min_heap, new_ugly)
return ugly
# First 10 ugly numbers: 1, 2, 3, 4, 5, 6, 8, 9, 10, 12
print(nth_ugly_number(10)) # 12
print(nth_ugly_number(1)) # 1PythonProblem 7: Sliding Window Maximum
import heapq
def sliding_window_maximum(nums, k):
"""
Find maximum in each sliding window of size k.
Time: O(n log n) Space: O(n)
Uses max-heap with lazy deletion.
"""
max_heap = []
result = []
for i, num in enumerate(nums):
heapq.heappush(max_heap, (-num, i)) # (-value, index) for max-heap
# Remove elements outside the window (lazy deletion)
while max_heap[0][1] <= i - k:
heapq.heappop(max_heap)
if i >= k - 1: # Window is full
result.append(-max_heap[0][0])
return result
# Test
print(sliding_window_maximum([1,3,-1,-3,5,3,6,7], 3)) # [3,3,5,5,6,7]
print(sliding_window_maximum([1,-1], 1)) # [1,-1]PythonProblem 8: Connect Ropes with Minimum Cost
import heapq
def connect_ropes(ropes):
"""
Connect all ropes into one with minimum cost.
Cost to connect two ropes = sum of their lengths.
Time: O(n log n) Space: O(n)
Greedy: always connect the two shortest ropes.
"""
heapq.heapify(ropes)
total_cost = 0
while len(ropes) > 1:
first = heapq.heappop(ropes)
second = heapq.heappop(ropes)
cost = first + second
total_cost += cost
heapq.heappush(ropes, cost)
return total_cost
# Test
print(connect_ropes([4, 3, 2, 6])) # 29
# Optimal: (2+3)=5, (4+5)=9, (6+9)=15 → total = 5+9+15 = 29PythonAdvanced Topics
Decrease Key Operation
import heapq
class HeapWithDecreaseKey:
"""
Min-heap supporting decrease-key via lazy deletion.
Used in Dijkstra's and Prim's algorithms.
"""
def __init__(self):
self._heap = []
self._entry_map = {} # key → entry in heap
self._REMOVED = "<removed>"
self._counter = 0
def push(self, item, priority):
"""Add item or update priority if item already exists"""
if item in self._entry_map:
self._remove(item)
entry = [priority, self._counter, item]
self._counter += 1
self._entry_map[item] = entry
heapq.heappush(self._heap, entry)
def _remove(self, item):
"""Mark existing entry as removed (lazy deletion)"""
entry = self._entry_map.pop(item)
entry[-1] = self._REMOVED
def pop(self):
"""Remove and return lowest priority item"""
while self._heap:
priority, _, item = heapq.heappop(self._heap)
if item is not self._REMOVED:
del self._entry_map[item]
return item, priority
raise IndexError("Heap is empty")
def decrease_key(self, item, new_priority):
"""Update item's priority to a smaller value"""
self.push(item, new_priority)
def is_empty(self):
return len(self._entry_map) == 0
# Example: Dijkstra's Shortest Path
def dijkstra(graph, start):
"""
Find shortest path from start to all vertices.
graph: {node: [(neighbor, weight), ...]}
Time: O((V + E) log V)
"""
distances = {node: float('inf') for node in graph}
distances[start] = 0
pq = HeapWithDecreaseKey()
pq.push(start, 0)
while not pq.is_empty():
node, dist = pq.pop()
if dist > distances[node]:
continue # Outdated entry
for neighbor, weight in graph[node]:
new_dist = distances[node] + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
pq.push(neighbor, new_dist)
return distances
# Test
graph = {
'A': [('B', 4), ('C', 2)],
'B': [('D', 3), ('C', 1)],
'C': [('B', 1), ('D', 5)],
'D': []
}
print(dijkstra(graph, 'A'))
# {'A': 0, 'B': 3, 'C': 2, 'D': 6}PythonD-ary Heap
class DaryHeap:
"""
D-ary min-heap: each node has exactly d children.
D=2 → Binary heap, D=4 → 4-ary heap.
Benefits:
- Smaller height: log_d(n) vs log_2(n)
- Fewer sift-up operations (insert: O(log_d n))
- But sift-down examines d children (extract: O(d * log_d n))
- Better cache performance for large d on modern hardware
"""
def __init__(self, d=4):
self.heap = []
self.d = d
def _parent(self, i):
return (i - 1) // self.d
def _children(self, i):
return range(self.d * i + 1, min(self.d * i + self.d + 1, len(self.heap)))
def push(self, val):
self.heap.append(val)
i = len(self.heap) - 1
while i > 0:
p = self._parent(i)
if self.heap[i] < self.heap[p]:
self.heap[i], self.heap[p] = self.heap[p], self.heap[i]
i = p
else:
break
def pop(self):
if not self.heap:
raise IndexError("Heap is empty")
self.heap[0], self.heap[-1] = self.heap[-1], self.heap[0]
val = self.heap.pop()
i = 0
while True:
children = list(self._children(i))
if not children:
break
smallest_child = min(children, key=lambda c: self.heap[c])
if self.heap[smallest_child] < self.heap[i]:
self.heap[i], self.heap[smallest_child] = self.heap[smallest_child], self.heap[i]
i = smallest_child
else:
break
return val
# Test
h = DaryHeap(d=4)
for v in [10, 4, 15, 1, 3, 8]:
h.push(v)
print(h.heap) # [1, ...]
print(h.pop()) # 1
print(h.pop()) # 3PythonIndexed Priority Queue
import heapq
class IndexedPriorityQueue:
"""
Priority queue with O(log n) update-key by index.
Useful for Dijkstra's and Prim's when keys need to be updated.
"""
def __init__(self):
self.heap = [] # (priority, item_id)
self.positions = {} # item_id → index in heap
self.priorities = {} # item_id → current priority
def push(self, item_id, priority):
heapq.heappush(self.heap, (priority, item_id))
self.positions[item_id] = len(self.heap) - 1
self.priorities[item_id] = priority
def update_priority(self, item_id, new_priority):
"""Update priority - O(n) rebuild, but typically O(log n) amortized with lazy deletion"""
self.priorities[item_id] = new_priority
heapq.heappush(self.heap, (new_priority, item_id))
def pop(self):
while self.heap:
priority, item_id = heapq.heappop(self.heap)
# Skip if not current priority (lazy deletion)
if self.priorities.get(item_id) == priority:
del self.priorities[item_id]
return item_id, priority
raise IndexError("Empty")
def is_empty(self):
return not self.prioritiesPythonComparison: heapq vs queue.PriorityQueue vs custom
# Summary of when to use each:
# 1. heapq (most common)
# - Single-threaded applications
# - Need direct access to underlying list
# - Memory efficient, fast
# - Default choice for competitive programming
# 2. queue.PriorityQueue
# - Multi-threaded applications
# - Need thread-safe operations
# - Slightly slower due to locks
# 3. Custom MinHeap / MaxHeap class
# - Need decrease-key operation
# - Want object-oriented interface
# - Educational purposes
# - When heapq API is insufficient
import heapq
from queue import PriorityQueue
import timeit
def bench_heapq(n=10000):
h = []
for i in range(n, 0, -1):
heapq.heappush(h, i)
for _ in range(n):
heapq.heappop(h)
def bench_pq(n=10000):
pq = PriorityQueue()
for i in range(n, 0, -1):
pq.put(i)
for _ in range(n):
pq.get()
t1 = timeit.timeit(lambda: bench_heapq(10000), number=10)
t2 = timeit.timeit(lambda: bench_pq(10000), number=10)
print(f"heapq: {t1:.3f}s")
print(f"queue.PriorityQueue:{t2:.3f}s")
print(f"heapq is ~{t2/t1:.1f}x faster (no thread-safety overhead)")PythonBest Practices and Tips
1. Always Use heapify Instead of Repeated Push
import heapq
data = [5, 3, 8, 1, 9, 2, 7]
# ❌ Slow: O(n log n)
heap1 = []
for x in data:
heapq.heappush(heap1, x)
# ✅ Fast: O(n) — always prefer this for building from existing data
heap2 = data[:]
heapq.heapify(heap2)Python2. Max-Heap via Negation
import heapq
# Python only provides min-heap via heapq
# For max-heap, negate values on push and pop
max_heap = []
heapq.heappush(max_heap, -10)
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -20)
max_val = -heapq.heappop(max_heap) # 20 ← always negate on pop
print(max_val) # 20Python3. Use Tuples for Complex Priorities
import heapq
# Use (priority, tiebreaker, item) pattern
import itertools
counter = itertools.count() # Thread-safe unique counter
heap = []
heapq.heappush(heap, (2, next(counter), "task B")) # secondary sort avoids comparison errors
heapq.heappush(heap, (1, next(counter), "task A"))
heapq.heappush(heap, (2, next(counter), "task C")) # same priority as B
# Without counter: if priority ties, Python compares items — may fail for non-comparable objects
# With counter: guarantees stable ordering (FIFO for equal priorities)
print(heapq.heappop(heap)) # (1, 1, 'task A')
print(heapq.heappop(heap)) # (2, 0, 'task B') ← B before C (inserted first)Python4. nlargest / nsmallest vs sorted()
import heapq
data = list(range(1000000))
# For very small k: heapq.nlargest/nsmallest is O(n log k) — preferred
# For k close to n: sorted() is O(n log n) — often faster in practice
k = 5
# ✅ Best for small k (k << n)
result = heapq.nlargest(k, data)
# ✅ Best for large k (k close to n) or when you need full sort
result = sorted(data, reverse=True)[:k]
# Rule of thumb: if k < n / 10, use heapq; otherwise use sorted()Python5. Know When NOT to Use a Heap
# Don't use heap when:
# 1. You need sorted output of all elements → use sorted() (Timsort is faster)
# 2. You need to search for a specific element → heap is O(n) for search
# 3. You need min AND max with updates → consider sortedcontainers.SortedList
# 4. You're doing repeated access without modification → just use sorted list
# Use heap when:
# 1. Repeatedly extracting min/max while adding new elements
# 2. Top-K problems (k << n)
# 3. Merging K sorted sequences
# 4. Priority scheduling
# 5. Dijkstra's, Prim's, A* algorithms
# 6. Running median (two-heap)Python6. Heap Property Verification
def is_min_heap(arr):
"""Verify array satisfies min-heap property"""
n = len(arr)
for i in range(n // 2): # Only internal nodes have children
left = 2 * i + 1
right = 2 * i + 2
if left < n and arr[i] > arr[left]:
return False
if right < n and arr[i] > arr[right]:
return False
return True
def is_max_heap(arr):
"""Verify array satisfies max-heap property"""
n = len(arr)
for i in range(n // 2):
if 2 * i + 1 < n and arr[i] < arr[2 * i + 1]:
return False
if 2 * i + 2 < n and arr[i] < arr[2 * i + 2]:
return False
return True
print(is_min_heap([1, 3, 2, 6, 5, 4])) # True
print(is_max_heap([9, 7, 8, 2, 4, 5])) # True
print(is_min_heap([3, 1, 2])) # False (3 > 1)PythonPractice Problems
Easy
| Problem | Pattern | Key Insight |
|---|---|---|
| Kth Largest Element in Array | Min-heap size k | Min-heap of size k; root = kth largest |
| Last Stone Weight | Max-heap | Smash two heaviest; push difference |
| K Closest Points to Origin | Max-heap size k | Keep k smallest distances |
| Sort Array by Increasing Frequency | Custom comparator | Build freq map, sort with heap by freq |
Medium
| Problem | Pattern | Key Insight |
|---|---|---|
| Top K Frequent Elements | Min-heap size k | Frequency count + min-heap |
| K Closest Numbers to X in Sorted Array | Max-heap size k | Use abs difference as priority |
| Reorganize String | Max-heap greedy | Always place most frequent char next |
| Task Scheduler | Max-heap + queue | Greedy: always use most frequent task |
| Kth Smallest Element in Sorted Matrix | Min-heap | Start with row 0, expand by row |
| Design Twitter Feed (Top 10 tweets) | Min-heap + merge | Merge k sorted user tweet streams |
| Meeting Rooms II (Min rooms needed) | Min-heap | Track earliest end time |
Hard
| Problem | Pattern | Key Insight |
|---|---|---|
| Find Median from Data Stream | Two heaps | Max-heap left, min-heap right |
| Sliding Window Maximum | Heap + lazy delete | Max-heap with position tracking |
| Merge K Sorted Lists | Min-heap | Heap of (value, list_idx, elem_idx) |
| Smallest Range Covering Elements from K Lists | Min-heap | Track current range, expand from min side |
| The Skyline Problem | Max-heap | Process critical x-coordinates |
| IPO (Project Selection) | Two heaps | Max capital gain, min capital needed |
Quick Reference
import heapq
# ─── Build ───────────────────────────────────────────────────
h = [] # Empty heap
heapq.heapify(lst) # Convert list to heap in-place O(n)
# ─── Insert ──────────────────────────────────────────────────
heapq.heappush(h, val) # Push value O(log n)
# ─── Access ──────────────────────────────────────────────────
h[0] # Peek min O(1)
heapq.heappop(h) # Pop min O(log n)
heapq.heappushpop(h, val) # Push then pop min O(log n)
heapq.heapreplace(h, val) # Pop min then push O(log n)
# ─── Utilities ───────────────────────────────────────────────
heapq.nlargest(k, iterable) # Top k largest O(n log k)
heapq.nsmallest(k, iterable) # Top k smallest O(n log k)
heapq.merge(*iterables) # Merge sorted O(n log k)
# ─── Max-Heap trick ──────────────────────────────────────────
heapq.heappush(h, -val) # Negate to simulate max-heap
max_val = -heapq.heappop(h) # Negate back when popping
# ─── Common patterns ─────────────────────────────────────────
# Kth largest: min-heap of size k → root is answer
# Kth smallest: max-heap of size k → root (negated) is answer
# Running median: max-heap (left) + min-heap (right)
# Merge K sorted: push (val, list_i, elem_i) → pop then push next from same listPythonDiscover more from Altgr Blog
Subscribe to get the latest posts sent to your email.
