Удивительный факт: heapify O(n)
Представьте: у вас массив из миллиона элементов, нужно сделать из него heap. Два способа:
Способ 1 — heappush N раз:
import heapq
h = []
for x in data:
heapq.heappush(h, x)
Сложность: N вставок × O(log N) = O(N log N).
Способ 2 — heapify:
data_copy = data[:]
heapq.heapify(data_copy)
Сложность: O(N).
Это удивляет: ведь heapify должна делать «больше работы», чем одна heappush. И большой массив требует обработки большего числа узлов. Но в сумме получается линейная сложность.
Этот урок — про почему так.
Что делает heapify
heapify превращает произвольный массив в heap «на месте». Алгоритм:
- Идти от середины массива к началу (от
n//2 - 1до 0). - Для каждой позиции выполнить sift down.
Почему от середины? Потому что узлы в массиве с индексом >= n // 2 — это листья (у них нет детей). С листьями ничего делать не надо, инвариант на них тривиально выполнен. Начинаем с последнего узла, который имеет хотя бы одного ребёнка, и идём вверх.
def heapify(arr):
n = len(arr)
# начинаем с последнего узла, имеющего детей
for i in range(n // 2 - 1, -1, -1):
sift_down(arr, i, n)
def sift_down(arr, i, n):
while True:
l = 2 * i + 1
r = 2 * i + 2
smallest = i
if l < n and arr[l] < arr[smallest]:
smallest = l
if r < n and arr[r] < arr[smallest]:
smallest = r
if smallest == i:
return
arr[i], arr[smallest] = arr[smallest], arr[i]
i = smallest
Доказательство O(n): сумма высот
Наивный анализ: «делаем sift down для n/2 узлов, каждый стоит O(log n), значит O(n log n)». Это верхняя оценка, но не tight.
Точный анализ: sift down для узла на высоте h стоит O(h), не O(log n). А не все узлы — на максимальной высоте.
Распределение узлов по высотам в complete tree из n узлов:
- Высота 0 (листья): ~n/2 узлов
- Высота 1: ~n/4 узлов
- Высота 2: ~n/8 узлов
- …
- Высота h: ~n/2^(h+1) узлов
- Высота log n (root): 1 узел
Стоимость sift down для одного узла на высоте h: не больше h (sift down движется максимум на h уровней вниз).
Общая стоимость heapify:
T(n) = сумма по всем высотам h от 0 до log n:
(число узлов на высоте h) * h
= сумма n/2^(h+1) * h
= (n/2) * сумма h / 2^h (от h=0 до log n)
Сумма h / 2^h сходится к 2 (это известный ряд). Значит:
T(n) = (n/2) * O(1) = O(n)
Получается линейная сложность, хотя интуитивно кажется, что должно быть O(n log n).
Половина узлов — листья (h=0, sift down тривиален). Только один узел — root (h=log n).
Интуиция: где работа сосредоточена
Главная интуиция: большинство узлов в complete tree — близко к листьям. На последнем уровне ~n/2 узлов, и они вообще не делают работы (sift down с высотой 0). На предпоследнем — n/4, и они делают только 1 swap максимум.
Только малая доля узлов (близких к root) делает существенную работу. Поэтому средняя стоимость на узел — константа, и общая работа O(n).
Эксперимент: heapify vs heappush’и
import heapq
import time
import random
n = 1_000_000
# подготовим случайные данные
data = list(range(n))
random.shuffle(data)
# 1. N heappush'ей
data_copy = data[:]
h = []
start = time.perf_counter()
for x in data_copy:
heapq.heappush(h, x)
t_push = time.perf_counter() - start
# 2. heapify
data_copy = data[:]
start = time.perf_counter()
heapq.heapify(data_copy)
t_heapify = time.perf_counter() - start
print(f"N heappush'ей: {t_push*1000:.0f} ms")
print(f"heapify: {t_heapify*1000:.0f} ms")
print(f"ratio: {t_push/t_heapify:.1f}x")
Типичные результаты:
N heappush'ей: 130 ms
heapify: 15 ms
ratio: 8.7x
heapify в ~9 раз быстрее N heappush’ей на 1M элементов. На больших n разница растёт логарифмически.
Это не теоретическая разница — реальное практическое ускорение для DE-сценариев.
Когда использовать heapify
heapify полезен, когда у вас уже есть массив и нужно превратить его в heap. Типичные сценарии:
1. Top-K на конечной коллекции
import heapq
values = [...] # 10M значений
k = 100
# подход 1: heappush + контроль размера
heap = []
for x in values:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
# подход 2: nlargest (использует heapify внутри для крайних случаев)
top = heapq.nlargest(k, values)
# подход 3: heapify + extract
import heapq
neg = [-x for x in values]
heapq.heapify(neg)
top = [-heapq.heappop(neg) for _ in range(k)]
Для k маленький относительно n, подход 1 лучше: O(n log k). Для k большой — подход 3 через heapify+extract: O(n + k log n).
2. Heap из существующих данных
import heapq
records = [...] # 1M записей
heap = [(rec.priority, rec) for rec in records]
heapq.heapify(heap) # O(n) — намного быстрее N heappush'ей
Это в типичном DE-pipeline: загрузить данные, сделать heap, обрабатывать через extract-min.
3. heapsort
import heapq
def heapsort(arr):
heapq.heapify(arr)
result = []
while arr:
result.append(heapq.heappop(arr))
return result
# O(n) heapify + O(n log n) extracts = O(n log n)
Heapsort работает за O(n log n) — heapify O(n), потом n pop’ов по O(log n).
Pythonic: nlargest / nsmallest
heapq предоставляет два удобных метода для top-K:
import heapq
data = [random.randint(0, 1_000_000) for _ in range(1_000_000)]
# top-10 maximum
top = heapq.nlargest(10, data)
# bottom-5 minimum
bottom = heapq.nsmallest(5, data)
Внутри они используют разные стратегии в зависимости от k и n:
- Если k очень маленький (< n/2): heap размера k.
- Если k большой: heapify + n-k extracts.
Это production-ready инструменты для top-K. Используйте их вместо ручных имплементаций.
# вместо
def my_top_k(data, k):
heap = []
for x in data:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
return sorted(heap, reverse=True)
# проще
top_k = heapq.nlargest(k, data)
Сравнение всех способов построить heap
heapify оптимален, когда данные уже в массиве. heappush — для streaming.
Запомнить: если есть массив — heapify. Если стрим — heappush по одному.
Когда O(n) heapify не помогает
heapify даёт O(n) только если массив уже есть в памяти. Для streaming-сценария (данные приходят по одному), heapify невозможен — приходится делать N heappush’ей.
В этом случае имеет смысл накапливать данные в буфер, потом периодически heapify. Например:
def streaming_top_k(stream, k, batch_size=10_000):
"""Top-K через буферизацию и periodic heapify."""
heap = []
buffer = []
for x in stream:
buffer.append(x)
if len(buffer) >= batch_size:
# все из буфера в массив, потом heapify, потом возьмём top-k
buffer.extend(heap)
heapq.heapify(buffer)
heap = heapq.nlargest(k, buffer)
buffer = []
if buffer:
buffer.extend(heap)
heapq.heapify(buffer)
heap = heapq.nlargest(k, buffer)
return heap
В батчах применяем O(n) heapify, между батчами — мизерный overhead.
Попробуй сам
import heapq
import time
import random
# 1. Сравните heapify vs N heappush на разных n
for n in [10_000, 100_000, 1_000_000]:
data = list(range(n))
random.shuffle(data)
# heapify
d1 = data[:]
start = time.perf_counter()
heapq.heapify(d1)
t_heapify = time.perf_counter() - start
# N heappush
d2 = []
start = time.perf_counter()
for x in data:
heapq.heappush(d2, x)
t_push = time.perf_counter() - start
print(f"n={n:>8d}: heapify={t_heapify*1000:>6.1f}ms, heappush={t_push*1000:>7.1f}ms, ratio={t_push/t_heapify:>4.1f}x")
# 2. Реализуйте свой heapify и проверьте корректность
def my_heapify(arr):
n = len(arr)
for i in range(n // 2 - 1, -1, -1):
# sift down
j = i
while True:
l = 2 * j + 1
r = 2 * j + 2
smallest = j
if l < n and arr[l] < arr[smallest]:
smallest = l
if r < n and arr[r] < arr[smallest]:
smallest = r
if smallest == j:
break
arr[j], arr[smallest] = arr[smallest], arr[j]
j = smallest
# проверка
def is_min_heap(arr):
for i in range(len(arr)):
l, r = 2 * i + 1, 2 * i + 2
if l < len(arr) and arr[i] > arr[l]:
return False
if r < len(arr) and arr[i] > arr[r]:
return False
return True
data = [9, 5, 3, 7, 1, 8, 2, 6, 4]
my_heapify(data)
print(f"\nMy heapify: {data}")
print(f"Is valid: {is_min_heap(data)}")
# 3. nlargest vs ручная top-k
data = [random.randint(0, 1_000_000) for _ in range(1_000_000)]
start = time.perf_counter()
top_python = heapq.nlargest(100, data)
t_nlargest = time.perf_counter() - start
start = time.perf_counter()
heap = []
for x in data:
if len(heap) < 100:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
top_manual = sorted(heap, reverse=True)
t_manual = time.perf_counter() - start
print(f"\nnlargest(100): {t_nlargest*1000:.0f} ms")
print(f"manual heap+replace: {t_manual*1000:.0f} ms")
# обычно: nlargest немного быстрее благодаря C-имплементации
Запомните: heapify за O(n) — практический выигрыш, не теоретический трюк. На 1M элементов разница между heapify и N heappush’ей — 100+ мс. Если у вас уже есть массив для heap’а, ВСЕГДА используйте heapify, а не цикл с heappush. Для top-K на готовой коллекции — heapq.nlargest / nsmallest, они оптимально выбирают стратегию.
В последнем уроке этого модуля — применения heap в DE: Dijkstra (упомянем), task scheduling, k-way merge для external sort, top-K из окон.