Две главные операции heap’а
После того как мы знаем структуру (complete binary tree в массиве) и инвариант (parent ≤ children), пора разобрать две главные операции, которые делают heap полезным:
- heappush(heap, value) — добавить элемент, сохранив инвариант. Через sift up.
- heappop(heap) — извлечь минимум, сохранив инвариант. Через sift down.
Обе работают за O(log n). Логика — изменения только вдоль одного пути от корня к листу или обратно.
heappush: sift up
Алгоритм:
- Добавить новый элемент в конец массива (это последний свободный слот — поддерживает complete tree).
- Sift up: сравнить с parent. Если меньше parent — поменять местами. Повторять, пока не дойдём до root или пока инвариант не восстановится.
def heappush(heap, value):
heap.append(value)
sift_up(heap, len(heap) - 1)
def sift_up(heap, i):
while i > 0:
parent_idx = (i - 1) // 2
if heap[i] < heap[parent_idx]:
# нарушение — поменять
heap[i], heap[parent_idx] = heap[parent_idx], heap[i]
i = parent_idx
else:
return
Пример:
было: [1, 3, 5, 7, 9, 6] валидный min-heap
heappush(heap, 2):
шаг 1: append -> [1, 3, 5, 7, 9, 6, 2] индекс новой 6
шаг 2: sift up i=6:
parent(6) = (6-1)//2 = 2, heap[2] = 5
2 < 5, swap:
[1, 3, 2, 7, 9, 6, 5]
i = 2
шаг 3: sift up i=2:
parent(2) = (2-1)//2 = 0, heap[0] = 1
2 > 1, stop
итог: [1, 3, 2, 7, 9, 6, 5]
Sift up делает максимум h = log n шагов вдоль одного пути. Каждый шаг O(1) (одно сравнение и swap). Общая сложность O(log n).
Новый элемент кладётся в конец, потом 'всплывает' вверх по дереву.
heappop: extract min + sift down
Самая хитрая часть — извлечь минимум, при этом не нарушив complete-структуру дерева. Алгоритм:
- Запомнить корень (это минимум).
- Переместить последний элемент в корень (чтобы остаться complete).
- Sift down: сравнить с детьми, swap с меньшим, повторять, пока инвариант не восстановится.
def heappop(heap):
last = heap.pop() # снять последний элемент
if not heap:
return last # heap был с одним элементом
result = heap[0] # запомнить минимум
heap[0] = last # положить last в корень
sift_down(heap, 0) # восстановить инвариант
return result
def sift_down(heap, i):
n = len(heap)
while True:
left = 2 * i + 1
right = 2 * i + 2
smallest = i
if left < n and heap[left] < heap[smallest]:
smallest = left
if right < n and heap[right] < heap[smallest]:
smallest = right
if smallest == i:
return # инвариант восстановлен
heap[i], heap[smallest] = heap[smallest], heap[i]
i = smallest
Пример:
было: [1, 3, 5, 7, 9, 6, 8] валидный min-heap, минимум = 1
heappop:
шаг 1: last = heap.pop() -> 8, heap = [1, 3, 5, 7, 9, 6]
шаг 2: result = heap[0] = 1
шаг 3: heap[0] = 8, heap = [8, 3, 5, 7, 9, 6]
шаг 4: sift_down i=0:
left=1 (3), right=2 (5), smallest=1 (val 3)
8 > 3, swap -> [3, 8, 5, 7, 9, 6], i=1
sift_down i=1:
left=3 (7), right=4 (9), smallest=3 (val 7)
8 > 7, swap -> [3, 7, 5, 8, 9, 6], i=3
sift_down i=3:
left=7, right=8 — оба out of bounds
smallest=i, stop
итог heap: [3, 7, 5, 8, 9, 6]
result = 1
Sift down тоже максимум O(log n) — путь от корня до листа.
Минимум забирается из root, последний элемент перемещается в root и 'опускается' вниз.
Почему sift down сравнивает с двумя детьми
Sift up прост — у узла один родитель, сравниваем с ним. Sift down сложнее — у узла два ребёнка, нужно найти меньшего и сравнить с ним. Если бы мы swap’или с произвольным ребёнком, могло бы нарушиться отношение child < other_child.
Пример где это важно:
heap [5, 3, 8] (root 5, дети 3 и 8)
Если sift down пошёл бы в правого ребёнка (5 vs 8 — не меняем, ок), но 5 > 3 (левый), мы оставили бы invariant нарушенным. Поэтому всегда swap с меньшим из двух детей.
# правильно
if left < n and heap[left] < heap[smallest]:
smallest = left
if right < n and heap[right] < heap[smallest]:
smallest = right
heapreplace: pop + push в одной операции
В стандартной библиотеке Python есть heapq.heapreplace(heap, item) — атомарная замена root’а:
import heapq
h = [1, 3, 5, 7]
result = heapq.heapreplace(h, 4)
# result = 1 (old min), h = [3, 4, 5, 7] (валидный heap с 4 вместо 1)
Это быстрее, чем heappop + heappush, потому что делает только один sift down вместо двух операций. Часто используется в top-K patterns:
def top_k(stream, k):
heap = []
for x in stream:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x) # одна операция вместо pop + push
return heap
Реализация heapq в Python — на C
Python heapq реализован на C (модуль _heapqmodule.c). Это даёт огромное ускорение по сравнению с Python-имплементацией:
import heapq
import time
n = 1_000_000
# heappush * n
h = []
start = time.perf_counter()
for i in range(n):
heapq.heappush(h, n - i)
t_c = time.perf_counter() - start
# теперь Python-имплементация
def my_heappush(heap, value):
heap.append(value)
i = len(heap) - 1
while i > 0:
p = (i - 1) // 2
if heap[i] < heap[p]:
heap[i], heap[p] = heap[p], heap[i]
i = p
else:
return
h2 = []
start = time.perf_counter()
for i in range(n):
my_heappush(h2, n - i)
t_py = time.perf_counter() - start
print(f"heapq (C): {t_c*1000:.0f} ms")
print(f"Python pure: {t_py*1000:.0f} ms")
print(f"ratio: {t_py/t_c:.0f}x")
Типично: heapq (C) ~150 ms, Python ~3000 ms. В 20× быстрее. Это и есть причина, почему всегда нужно использовать стандартный heapq, не свой.
Сложность операций
Все стандартные операции O(log n) или O(1).
Применения в DE
Task scheduler
import heapq
tasks = [] # heap из (priority, task_id, task)
heapq.heappush(tasks, (1, 'task_1', 'high priority'))
heapq.heappush(tasks, (3, 'task_2', 'low priority'))
heapq.heappush(tasks, (2, 'task_3', 'medium'))
while tasks:
priority, task_id, task = heapq.heappop(tasks)
print(f"Running {task_id} (priority {priority}): {task}")
Зачем нужен task_id во второй позиции tuple’а? Чтобы избежать сравнения объектов с одинаковым приоритетом — tuple сравнивается лексикографически, и если priority равны, Python начнёт сравнивать следующие поля. task_id (уникальный) гарантирует, что comparison не упрётся в сравнение dict’ов или custom объектов.
Top-K из стрима — реальный размер
Допустим, у вас 100 миллионов событий, нужно top-1000:
import heapq
import random
def top_k(stream, k):
heap = []
for value in stream:
if len(heap) < k:
heapq.heappush(heap, value)
elif value > heap[0]:
heapq.heapreplace(heap, value)
return sorted(heap, reverse=True)
# симулируем стрим
stream = (random.random() for _ in range(100_000_000))
top_1000 = top_k(stream, 1000)
Память: O(1000) против O(100M) для full sort. Время: O(N log K) = O(N log 1000) ≈ O(10 N).
K-way merge через heap
import heapq
# 4 отсортированных файла
files = [
open('sorted1.txt'),
open('sorted2.txt'),
open('sorted3.txt'),
open('sorted4.txt'),
]
# heapq.merge принимает iterators, возвращает iterator
with open('output.txt', 'w') as out:
for line in heapq.merge(*files):
out.write(line)
for f in files:
f.close()
Memory O(K) — только 4 «текущих» строки в heap’е. Это external sort: вы могли бы отсортировать терабайтный файл при 1GB RAM, разбив на куски, отсортировав каждый кусок в RAM, сохранив на диск, потом слив через heapq.merge.
Попробуй сам
import heapq
# 1. heappush + heappop последовательно — это HeapSort
data = [5, 3, 8, 1, 9, 2, 7]
h = []
for x in data:
heapq.heappush(h, x)
result = []
while h:
result.append(heapq.heappop(h))
print(f"Sorted via heap: {result}") # [1, 2, 3, 5, 7, 8, 9]
# 2. Реализуйте собственный sift_up и сравните с heapq
def my_push(heap, value):
heap.append(value)
i = len(heap) - 1
while i > 0:
p = (i - 1) // 2
if heap[i] < heap[p]:
heap[i], heap[p] = heap[p], heap[i]
i = p
else:
return
# 3. heapreplace vs pop + push на 100K операций
import time
n = 100_000
h1 = list(range(n))
heapq.heapify(h1)
start = time.perf_counter()
for i in range(n, n + n):
heapq.heappop(h1)
heapq.heappush(h1, i)
t_pop_push = time.perf_counter() - start
h2 = list(range(n))
heapq.heapify(h2)
start = time.perf_counter()
for i in range(n, n + n):
heapq.heapreplace(h2, i)
t_replace = time.perf_counter() - start
print(f"\npop + push: {t_pop_push*1000:.0f} ms")
print(f"heapreplace: {t_replace*1000:.0f} ms")
print(f"ratio: {t_pop_push/t_replace:.1f}x")
# heapreplace обычно 1.5-2x быстрее
# 4. Task scheduler с приоритетом
import itertools
class TaskQueue:
def __init__(self):
self.heap = []
self.counter = itertools.count() # уникальный id для tie-breaker
def push(self, priority, task):
heapq.heappush(self.heap, (priority, next(self.counter), task))
def pop(self):
priority, _, task = heapq.heappop(self.heap)
return priority, task
q = TaskQueue()
q.push(3, "send email")
q.push(1, "process payment")
q.push(2, "update cache")
q.push(1, "validate input") # same priority, разные task'и
while q.heap:
p, t = q.pop()
print(f"priority {p}: {t}")
# 1: process payment
# 1: validate input
# 2: update cache
# 3: send email
Не пишите свой heap на Python. heapq в стандартной библиотеке реализован на C и в 20+ раз быстрее любой ручной имплементации. Используйте heappush, heappop, heapreplace, heapify, heappushpop из этого модуля. Если нужен max-heap — кладите отрицания.
В следующем уроке разберём удивительный факт: heapify работает за O(n), а не O(n log n) — почему и как это используется на практике.