Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 28 мин
Начальный
heapifyamortized analysissum of heightsheap construction

Удивительный факт: heapify O(n)

Представьте: у вас массив из миллиона элементов, нужно сделать из него heap. Два способа:

Способ 1 — heappush N раз:

import heapq
h = []
for x in data:
    heapq.heappush(h, x)

Сложность: N вставок × O(log N) = O(N log N).

Способ 2 — heapify:

data_copy = data[:]
heapq.heapify(data_copy)

Сложность: O(N).

Это удивляет: ведь heapify должна делать «больше работы», чем одна heappush. И большой массив требует обработки большего числа узлов. Но в сумме получается линейная сложность.

Этот урок — про почему так.

Что делает heapify

heapify превращает произвольный массив в heap «на месте». Алгоритм:

  1. Идти от середины массива к началу (от n//2 - 1 до 0).
  2. Для каждой позиции выполнить sift down.

Почему от середины? Потому что узлы в массиве с индексом >= n // 2 — это листья (у них нет детей). С листьями ничего делать не надо, инвариант на них тривиально выполнен. Начинаем с последнего узла, который имеет хотя бы одного ребёнка, и идём вверх.

def heapify(arr):
    n = len(arr)
    # начинаем с последнего узла, имеющего детей
    for i in range(n // 2 - 1, -1, -1):
        sift_down(arr, i, n)

def sift_down(arr, i, n):
    while True:
        l = 2 * i + 1
        r = 2 * i + 2
        smallest = i
        if l < n and arr[l] < arr[smallest]:
            smallest = l
        if r < n and arr[r] < arr[smallest]:
            smallest = r
        if smallest == i:
            return
        arr[i], arr[smallest] = arr[smallest], arr[i]
        i = smallest

Доказательство O(n): сумма высот

Наивный анализ: «делаем sift down для n/2 узлов, каждый стоит O(log n), значит O(n log n)». Это верхняя оценка, но не tight.

Точный анализ: sift down для узла на высоте h стоит O(h), не O(log n). А не все узлы — на максимальной высоте.

Распределение узлов по высотам в complete tree из n узлов:

  • Высота 0 (листья): ~n/2 узлов
  • Высота 1: ~n/4 узлов
  • Высота 2: ~n/8 узлов
  • Высота h: ~n/2^(h+1) узлов
  • Высота log n (root): 1 узел

Стоимость sift down для одного узла на высоте h: не больше h (sift down движется максимум на h уровней вниз).

Общая стоимость heapify:

T(n) = сумма по всем высотам h от 0 до log n:
       (число узлов на высоте h) * h
     = сумма n/2^(h+1) * h
     = (n/2) * сумма h / 2^h     (от h=0 до log n)

Сумма h / 2^h сходится к 2 (это известный ряд). Значит:

T(n) = (n/2) * O(1) = O(n)

Получается линейная сложность, хотя интуитивно кажется, что должно быть O(n log n).

Распределение узлов по высотам

Половина узлов — листья (h=0, sift down тривиален). Только один узел — root (h=log n).

высота h
0 (листья)~n/2 узлов, sift down цена 0у листьев нет детей, ничего не делать
1~n/4 узлов, цена ≤ 1 каждый
2~n/8 узлов, цена ≤ 2 каждый
3~n/16 узлов, цена ≤ 3 каждый
......
log n (root)1 узел, цена ≤ log n
итогO(n)сумма n/2^h * h сходится

Интуиция: где работа сосредоточена

Главная интуиция: большинство узлов в complete tree — близко к листьям. На последнем уровне ~n/2 узлов, и они вообще не делают работы (sift down с высотой 0). На предпоследнем — n/4, и они делают только 1 swap максимум.

Только малая доля узлов (близких к root) делает существенную работу. Поэтому средняя стоимость на узел — константа, и общая работа O(n).

Эксперимент: heapify vs heappush’и

import heapq
import time
import random

n = 1_000_000

# подготовим случайные данные
data = list(range(n))
random.shuffle(data)

# 1. N heappush'ей
data_copy = data[:]
h = []
start = time.perf_counter()
for x in data_copy:
    heapq.heappush(h, x)
t_push = time.perf_counter() - start

# 2. heapify
data_copy = data[:]
start = time.perf_counter()
heapq.heapify(data_copy)
t_heapify = time.perf_counter() - start

print(f"N heappush'ей: {t_push*1000:.0f} ms")
print(f"heapify:       {t_heapify*1000:.0f} ms")
print(f"ratio: {t_push/t_heapify:.1f}x")

Типичные результаты:

N heappush'ей: 130 ms
heapify:       15 ms
ratio: 8.7x

heapify в ~9 раз быстрее N heappush’ей на 1M элементов. На больших n разница растёт логарифмически.

Это не теоретическая разница — реальное практическое ускорение для DE-сценариев.

Когда использовать heapify

heapify полезен, когда у вас уже есть массив и нужно превратить его в heap. Типичные сценарии:

1. Top-K на конечной коллекции

import heapq

values = [...]   # 10M значений
k = 100

# подход 1: heappush + контроль размера
heap = []
for x in values:
    if len(heap) < k:
        heapq.heappush(heap, x)
    elif x > heap[0]:
        heapq.heapreplace(heap, x)

# подход 2: nlargest (использует heapify внутри для крайних случаев)
top = heapq.nlargest(k, values)

# подход 3: heapify + extract
import heapq
neg = [-x for x in values]
heapq.heapify(neg)
top = [-heapq.heappop(neg) for _ in range(k)]

Для k маленький относительно n, подход 1 лучше: O(n log k). Для k большой — подход 3 через heapify+extract: O(n + k log n).

2. Heap из существующих данных

import heapq

records = [...]    # 1M записей
heap = [(rec.priority, rec) for rec in records]
heapq.heapify(heap)    # O(n) — намного быстрее N heappush'ей

Это в типичном DE-pipeline: загрузить данные, сделать heap, обрабатывать через extract-min.

3. heapsort

import heapq

def heapsort(arr):
    heapq.heapify(arr)
    result = []
    while arr:
        result.append(heapq.heappop(arr))
    return result

# O(n) heapify + O(n log n) extracts = O(n log n)

Heapsort работает за O(n log n) — heapify O(n), потом n pop’ов по O(log n).

Pythonic: nlargest / nsmallest

heapq предоставляет два удобных метода для top-K:

import heapq

data = [random.randint(0, 1_000_000) for _ in range(1_000_000)]

# top-10 maximum
top = heapq.nlargest(10, data)

# bottom-5 minimum  
bottom = heapq.nsmallest(5, data)

Внутри они используют разные стратегии в зависимости от k и n:

  • Если k очень маленький (< n/2): heap размера k.
  • Если k большой: heapify + n-k extracts.

Это production-ready инструменты для top-K. Используйте их вместо ручных имплементаций.

# вместо
def my_top_k(data, k):
    heap = []
    for x in data:
        if len(heap) < k:
            heapq.heappush(heap, x)
        elif x > heap[0]:
            heapq.heapreplace(heap, x)
    return sorted(heap, reverse=True)

# проще
top_k = heapq.nlargest(k, data)

Сравнение всех способов построить heap

Способы построить heap из n элементов

heapify оптимален, когда данные уже в массиве. heappush — для streaming.

способ
heapify(arr)O(n)лучший, когда массив уже есть
N heappush'ейO(n log n)когда данные приходят по одному
sort + взять как heapO(n log n)отсортированный массив — это heap, но избыточно
heap.append + sift_up * Nэквивалент heappush

Запомнить: если есть массив — heapify. Если стрим — heappush по одному.

Когда O(n) heapify не помогает

heapify даёт O(n) только если массив уже есть в памяти. Для streaming-сценария (данные приходят по одному), heapify невозможен — приходится делать N heappush’ей.

В этом случае имеет смысл накапливать данные в буфер, потом периодически heapify. Например:

def streaming_top_k(stream, k, batch_size=10_000):
    """Top-K через буферизацию и periodic heapify."""
    heap = []
    buffer = []
    
    for x in stream:
        buffer.append(x)
        if len(buffer) >= batch_size:
            # все из буфера в массив, потом heapify, потом возьмём top-k
            buffer.extend(heap)
            heapq.heapify(buffer)
            heap = heapq.nlargest(k, buffer)
            buffer = []
    
    if buffer:
        buffer.extend(heap)
        heapq.heapify(buffer)
        heap = heapq.nlargest(k, buffer)
    
    return heap

В батчах применяем O(n) heapify, между батчами — мизерный overhead.

Попробуй сам

import heapq
import time
import random

# 1. Сравните heapify vs N heappush на разных n
for n in [10_000, 100_000, 1_000_000]:
    data = list(range(n))
    random.shuffle(data)
    
    # heapify
    d1 = data[:]
    start = time.perf_counter()
    heapq.heapify(d1)
    t_heapify = time.perf_counter() - start
    
    # N heappush
    d2 = []
    start = time.perf_counter()
    for x in data:
        heapq.heappush(d2, x)
    t_push = time.perf_counter() - start
    
    print(f"n={n:>8d}: heapify={t_heapify*1000:>6.1f}ms, heappush={t_push*1000:>7.1f}ms, ratio={t_push/t_heapify:>4.1f}x")

# 2. Реализуйте свой heapify и проверьте корректность
def my_heapify(arr):
    n = len(arr)
    for i in range(n // 2 - 1, -1, -1):
        # sift down
        j = i
        while True:
            l = 2 * j + 1
            r = 2 * j + 2
            smallest = j
            if l < n and arr[l] < arr[smallest]:
                smallest = l
            if r < n and arr[r] < arr[smallest]:
                smallest = r
            if smallest == j:
                break
            arr[j], arr[smallest] = arr[smallest], arr[j]
            j = smallest

# проверка
def is_min_heap(arr):
    for i in range(len(arr)):
        l, r = 2 * i + 1, 2 * i + 2
        if l < len(arr) and arr[i] > arr[l]:
            return False
        if r < len(arr) and arr[i] > arr[r]:
            return False
    return True

data = [9, 5, 3, 7, 1, 8, 2, 6, 4]
my_heapify(data)
print(f"\nMy heapify: {data}")
print(f"Is valid:  {is_min_heap(data)}")

# 3. nlargest vs ручная top-k
data = [random.randint(0, 1_000_000) for _ in range(1_000_000)]

start = time.perf_counter()
top_python = heapq.nlargest(100, data)
t_nlargest = time.perf_counter() - start

start = time.perf_counter()
heap = []
for x in data:
    if len(heap) < 100:
        heapq.heappush(heap, x)
    elif x > heap[0]:
        heapq.heapreplace(heap, x)
top_manual = sorted(heap, reverse=True)
t_manual = time.perf_counter() - start

print(f"\nnlargest(100):       {t_nlargest*1000:.0f} ms")
print(f"manual heap+replace: {t_manual*1000:.0f} ms")
# обычно: nlargest немного быстрее благодаря C-имплементации
TIP

Запомните: heapify за O(n) — практический выигрыш, не теоретический трюк. На 1M элементов разница между heapify и N heappush’ей — 100+ мс. Если у вас уже есть массив для heap’а, ВСЕГДА используйте heapify, а не цикл с heappush. Для top-K на готовой коллекции — heapq.nlargest / nsmallest, они оптимально выбирают стратегию.

В последнем уроке этого модуля — применения heap в DE: Dijkstra (упомянем), task scheduling, k-way merge для external sort, top-K из окон.

Проверка знанийKnowledge check
Heapify работает за O(n), а N последовательных heappush'ей — за O(n log n). Объясните, почему наивная оценка 'каждый узел делает sift down за O(log n), всего n узлов, значит O(n log n)' неверна для heapify.
ОтветAnswer
Наивная оценка завышена, потому что она использует worst-case стоимость sift down (O(log n) — путь от root до листа), но на самом деле большинство узлов в complete tree находятся БЛИЗКО К ЛИСТЬЯМ, и их sift down дешёвый. Распределение узлов по высотам: на высоте 0 (листья) ~n/2 узлов — их sift down занимает 0 шагов; на высоте 1 — ~n/4 узлов, каждый делает максимум 1 шаг; на высоте 2 — ~n/8 узлов, по 2 шага; и так далее. На высоте h находится ~n/2^(h+1) узлов, каждый требует ≤ h шагов. Сумма по всем высотам: n/2 * 0 + n/4 * 1 + n/8 * 2 + n/16 * 3 + ... = (n/2) * сумма(h/2^h). Этот ряд сходится к 2, поэтому общая работа — O(n), а не O(n log n). Контраст с heappush: каждый push добавляет ОДИН элемент в КОНЕЦ (потенциально на самую низкую высоту), и его sift up может пройти весь путь к root за O(log n). Поэтому N push'ей дают n * O(log n) = O(n log n). Heapify обходит дерево снизу вверх, амортизируя стоимость через распределение, push добавляет 'по одному' без амортизации.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. Какова сложность heapify (превращение произвольного массива в heap)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5