Learning Platform
Глоссарий Troubleshooting
Урок 19.04 · 25 мин
Начальный
top-kmin-heapheapqnlargeststreamingpriority-queue

Задача: найти top-K в потоке

Классическая задача DE: «найти 100 самых активных пользователей за день», «топ-10 самых медленных запросов», «100 страниц с наибольшим CTR». Везде задача одна: из n значений найти k наибольших (или наименьших).

Наивное решение — полная сортировка и взять первые k:

# O(n log n) — сортируем всё, берём верхушку
top_100 = sorted(events, key=lambda x: x.score, reverse=True)[:100]

На миллионе записей это работает, но избыточно: мы сортируем 999900 элементов, которые нам не нужны. Существует решение за O(n log k) — работает быстрее, экономит память.

Min-heap размера K

Идея: храним k наибольших элементов в min-heap размера k. Корень heap — самое маленькое из k наибольших. При обработке нового элемента:

  1. Если новый > root -> root заменяем на новый, восстанавливаем heap-инвариант.
  2. Иначе -> пропускаем.

После прохода всех n элементов в heap остаются k наибольших. Сложность: каждый элемент — O(log k) сравнение/вставка, всего n элементов = O(n log k).

import heapq

def top_k(items, k):
    heap = []
    for x in items:
        if len(heap) < k:
            heapq.heappush(heap, x)              # пока не заполнили
        elif x > heap[0]:
            heapq.heapreplace(heap, x)           # вытеснили root
    return sorted(heap, reverse=True)            # для красоты, не обязательно

items = [3, 7, 1, 9, 5, 8, 2, 6, 4]
print(top_k(items, 3))   # [9, 8, 7]
Top-3 через min-heap размера 3

Каждый новый элемент сравнивается с корнем heap (минимум). Если больше — вытесняет.

вход3первый элемент
heap[3]заполняем
вход7второй
heap[3, 7]ещё не полный
вход1третий
heap[1, 7, 3]размер 3, root=1 (минимум)
вход9больше root=1
heap[3, 7, 9]вытеснили 1, новый root=3
вход5больше root=3
heap[5, 7, 9]вытеснили 3, новый root=5
вход2меньше root=5
heap[5, 7, 9]пропускаем — 2 не в top-3
итогtop-3 = [9, 8, 7]после прохода всех элементов

Каждый элемент обрабатывается за O(log k) — это очень быстро при маленьком k. Если k=100, log k = 7 — практически 7 сравнений на элемент.

heapq.nlargest — готовое решение

В Python есть встроенная функция:

import heapq

items = [3, 7, 1, 9, 5, 8, 2, 6, 4]
print(heapq.nlargest(3, items))   # [9, 8, 7]

Под капотом — тот же алгоритм. Работает на любом итераторе:

# top-3 пар (user_id, score)
events = [
    ("alice", 100),
    ("bob", 200),
    ("carol", 50),
    ("dave", 300),
]
top3 = heapq.nlargest(3, events, key=lambda x: x[1])
print(top3)   # [('dave', 300), ('bob', 200), ('alice', 100)]

heapq.nsmallest(k, items) — наоборот, k наименьших.

Сравнение с полной сортировкой

подходвремяпамятькогда лучше
sorted(items)[:k]O(n log n)O(n)k близко к n, нужна полная сортировка
heapq.nlargest(k, items)O(n log k)O(k)k много меньше n (стандартный случай)
numpy.argpartitionO(n)O(n)очень большие массивы из чисел

При k=10 и n=10^6: sorted делает 20 миллионов сравнений, heapq.nlargest — ~10^7 сравнений (быстрее в 2x) и не требует хранить весь массив в памяти.

Главное преимущество heapq — потоковая обработка. Можно скармливать генератор:

def event_stream():
    for line in open("events.log"):
        yield parse(line)

# top-100 без загрузки всех событий в RAM
top_100 = heapq.nlargest(100, event_stream(), key=lambda x: x.score)

Это работает с памятью O(100), независимо от размера файла.

Top-K по ключу с подсчётом

Часто нужно top-K не по сырому значению, а после агрегации. «Топ-10 пользователей по числу событий»:

from collections import Counter
import heapq

events = [...]   # миллион событий

# вариант 1: Counter + most_common
counts = Counter(e.user_id for e in events)
top10 = counts.most_common(10)   # heapq.nlargest внутри

# вариант 2: явно с heapq
counts = Counter(e.user_id for e in events)
top10 = heapq.nlargest(10, counts.items(), key=lambda x: x[1])

Counter.most_common(k) — это обёртка над heapq.nlargest, ничем не отличается. Используйте удобный.

Streaming top-K

Если данные поступают потоком и k фиксированный, держим heap размера k в памяти и обновляем на каждое событие:

import heapq

class StreamingTopK:
    def __init__(self, k):
        self.k = k
        self.heap = []      # min-heap

    def add(self, item, score):
        if len(self.heap) < self.k:
            heapq.heappush(self.heap, (score, item))
        elif score > self.heap[0][0]:
            heapq.heapreplace(self.heap, (score, item))

    def top(self):
        return sorted(self.heap, reverse=True)

st = StreamingTopK(3)
for item, score in [("a", 5), ("b", 10), ("c", 3), ("d", 8), ("e", 12), ("f", 1)]:
    st.add(item, score)

print(st.top())   # [(12, 'e'), (10, 'b'), (8, 'd')]

На каждое событие — O(log k). На миллиарде событий с k=100 это 10^9 * log(100) ~ 7 * 10^9 операций. Несколько минут CPU на одно ядро.

Top-K с обновлениями

Сложнее, если ключи могут обновляться. Например, score пользователя растёт. Простой heap не подходит — не умеет «обновить элемент с конкретным id».

Решение — dict + SortedList (см. модуль 16):

from sortedcontainers import SortedList

class TopKDynamic:
    def __init__(self, k):
        self.k = k
        self.scores = {}                 # user -> score
        self.sorted = SortedList()       # (score, user)

    def update(self, user, new_score):
        if user in self.scores:
            old = self.scores[user]
            self.sorted.remove((old, user))   # O(log n)
        self.scores[user] = new_score
        self.sorted.add((new_score, user))    # O(log n)

    def top(self):
        return [u for _, u in self.sorted[-self.k:]]

dict — для быстрого «текущий score пользователя». SortedList — для быстрого top-K. Каждая операция O(log n) — приемлемо для большинства задач.

В Redis Sorted Set (ZSET) реализует ровно это: skip list + hash для O(log n) update и O(log n + k) top-K. Используется во всех leaderboard-системах.

TIP

Когда вы видите задачу ‘найти k лучших из большого набора’ — первый рефлекс должен быть heapq.nlargest. Полная сортировка тратит время на 99 percent элементов, которые вам не нужны. Это самая частая микро-оптимизация в дата-инженерии.

Min-heap vs max-heap

Python heapq — это min-heap: корень — минимум. Поэтому top-K наибольших реализуется через min-heap размера k (root = самый маленький из k наибольших).

Если нужен max-heap (для top-K наименьших или явного top() — самый большой):

# трюк: храним отрицательные значения
heap = []
heapq.heappush(heap, -x)    # вставка отрицательного
max_value = -heapq.heappop(heap)

Или через тупл с инверсией ключа:

heapq.heappush(heap, (-score, item))   # отрицательный score

heapq.nlargest уже учитывает это внутри.

Замеры

Сравним sorted vs heapq.nlargest на 1 миллионе:

import random
import time
import heapq

n = 1_000_000
items = [random.randint(0, n) for _ in range(n)]
k = 100

t0 = time.time()
top_sort = sorted(items, reverse=True)[:k]
t_sort = time.time() - t0

t0 = time.time()
top_heap = heapq.nlargest(k, items)
t_heap = time.time() - t0

assert top_sort == top_heap
print(f"sorted: {t_sort*1000:.1f} ms")
print(f"nlargest: {t_heap*1000:.1f} ms")
print(f"speedup: {t_sort/t_heap:.1f}x")

Типичный результат: sorted ~150 мс, nlargest ~80 мс. Ускорение около 2x. На k=10 разница ещё больше.

Попробуй сам

Streaming top-K с предметной задачей: «топ-10 IP-адресов по числу запросов в логе»:

from collections import Counter
import heapq
import random

# имитация log: 100k запросов
log = [f"10.0.{random.randint(0,255)}.{random.randint(0,255)}" for _ in range(100_000)]

# через Counter
counts = Counter(log)
top10 = counts.most_common(10)
for ip, count in top10:
    print(f"{ip}: {count} requests")

Это работает за миллисекунды. Замените источник на потоковый (генератор по файлу), и получите top-K без загрузки всего лога в память.

Merge join и sort: когда PostgreSQL выбирает MIN-heap для top-K
Проверка знанийKnowledge check
Почему min-heap размера k идеален для top-K наибольших, и почему heapq.nlargest предпочтительнее sorted()[:k] для больших n при маленьком k?
ОтветAnswer
Min-heap размера k хранит ровно k лучших кандидатов. Корень — минимум из них = самый слабый кандидат. Когда приходит новый элемент: если он больше корня (значит, лучше слабейшего из top-K), вытесняем корень и вставляем новый — O(log k). Если меньше — пропускаем (новый не в top-K). После прохода всех n элементов в heap остаются k наибольших. Сложность O(n log k) — на маленьком k log k практически константа. sorted()[:k] делает полную сортировку всех n элементов за O(n log n), потом отбрасывает n-k. Это лишняя работа: при k=10 и n=10^6 sorted делает 20 миллионов сравнений на элементы, которые не нужны. nlargest делает ~10 миллионов на нужные. Память: sorted требует O(n) на копию массива, nlargest — O(k). На потоках (генератор) sorted() вообще не работает — нельзя сортировать поток. nlargest работает потоково с памятью O(k).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какая сложность у top-K через min-heap размера K на массиве из n элементов, и почему она лучше sorted()[:k]?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5