Задача: найти top-K в потоке
Классическая задача DE: «найти 100 самых активных пользователей за день», «топ-10 самых медленных запросов», «100 страниц с наибольшим CTR». Везде задача одна: из n значений найти k наибольших (или наименьших).
Наивное решение — полная сортировка и взять первые k:
# O(n log n) — сортируем всё, берём верхушку
top_100 = sorted(events, key=lambda x: x.score, reverse=True)[:100]
На миллионе записей это работает, но избыточно: мы сортируем 999900 элементов, которые нам не нужны. Существует решение за O(n log k) — работает быстрее, экономит память.
Min-heap размера K
Идея: храним k наибольших элементов в min-heap размера k. Корень heap — самое маленькое из k наибольших. При обработке нового элемента:
- Если новый > root -> root заменяем на новый, восстанавливаем heap-инвариант.
- Иначе -> пропускаем.
После прохода всех n элементов в heap остаются k наибольших. Сложность: каждый элемент — O(log k) сравнение/вставка, всего n элементов = O(n log k).
import heapq
def top_k(items, k):
heap = []
for x in items:
if len(heap) < k:
heapq.heappush(heap, x) # пока не заполнили
elif x > heap[0]:
heapq.heapreplace(heap, x) # вытеснили root
return sorted(heap, reverse=True) # для красоты, не обязательно
items = [3, 7, 1, 9, 5, 8, 2, 6, 4]
print(top_k(items, 3)) # [9, 8, 7]
Каждый новый элемент сравнивается с корнем heap (минимум). Если больше — вытесняет.
Каждый элемент обрабатывается за O(log k) — это очень быстро при маленьком k. Если k=100, log k = 7 — практически 7 сравнений на элемент.
heapq.nlargest — готовое решение
В Python есть встроенная функция:
import heapq
items = [3, 7, 1, 9, 5, 8, 2, 6, 4]
print(heapq.nlargest(3, items)) # [9, 8, 7]
Под капотом — тот же алгоритм. Работает на любом итераторе:
# top-3 пар (user_id, score)
events = [
("alice", 100),
("bob", 200),
("carol", 50),
("dave", 300),
]
top3 = heapq.nlargest(3, events, key=lambda x: x[1])
print(top3) # [('dave', 300), ('bob', 200), ('alice', 100)]
heapq.nsmallest(k, items) — наоборот, k наименьших.
Сравнение с полной сортировкой
| подход | время | память | когда лучше |
|---|---|---|---|
| sorted(items)[:k] | O(n log n) | O(n) | k близко к n, нужна полная сортировка |
| heapq.nlargest(k, items) | O(n log k) | O(k) | k много меньше n (стандартный случай) |
| numpy.argpartition | O(n) | O(n) | очень большие массивы из чисел |
При k=10 и n=10^6: sorted делает 20 миллионов сравнений, heapq.nlargest — ~10^7 сравнений (быстрее в 2x) и не требует хранить весь массив в памяти.
Главное преимущество heapq — потоковая обработка. Можно скармливать генератор:
def event_stream():
for line in open("events.log"):
yield parse(line)
# top-100 без загрузки всех событий в RAM
top_100 = heapq.nlargest(100, event_stream(), key=lambda x: x.score)
Это работает с памятью O(100), независимо от размера файла.
Top-K по ключу с подсчётом
Часто нужно top-K не по сырому значению, а после агрегации. «Топ-10 пользователей по числу событий»:
from collections import Counter
import heapq
events = [...] # миллион событий
# вариант 1: Counter + most_common
counts = Counter(e.user_id for e in events)
top10 = counts.most_common(10) # heapq.nlargest внутри
# вариант 2: явно с heapq
counts = Counter(e.user_id for e in events)
top10 = heapq.nlargest(10, counts.items(), key=lambda x: x[1])
Counter.most_common(k) — это обёртка над heapq.nlargest, ничем не отличается. Используйте удобный.
Streaming top-K
Если данные поступают потоком и k фиксированный, держим heap размера k в памяти и обновляем на каждое событие:
import heapq
class StreamingTopK:
def __init__(self, k):
self.k = k
self.heap = [] # min-heap
def add(self, item, score):
if len(self.heap) < self.k:
heapq.heappush(self.heap, (score, item))
elif score > self.heap[0][0]:
heapq.heapreplace(self.heap, (score, item))
def top(self):
return sorted(self.heap, reverse=True)
st = StreamingTopK(3)
for item, score in [("a", 5), ("b", 10), ("c", 3), ("d", 8), ("e", 12), ("f", 1)]:
st.add(item, score)
print(st.top()) # [(12, 'e'), (10, 'b'), (8, 'd')]
На каждое событие — O(log k). На миллиарде событий с k=100 это 10^9 * log(100) ~ 7 * 10^9 операций. Несколько минут CPU на одно ядро.
Top-K с обновлениями
Сложнее, если ключи могут обновляться. Например, score пользователя растёт. Простой heap не подходит — не умеет «обновить элемент с конкретным id».
Решение — dict + SortedList (см. модуль 16):
from sortedcontainers import SortedList
class TopKDynamic:
def __init__(self, k):
self.k = k
self.scores = {} # user -> score
self.sorted = SortedList() # (score, user)
def update(self, user, new_score):
if user in self.scores:
old = self.scores[user]
self.sorted.remove((old, user)) # O(log n)
self.scores[user] = new_score
self.sorted.add((new_score, user)) # O(log n)
def top(self):
return [u for _, u in self.sorted[-self.k:]]
dict — для быстрого «текущий score пользователя». SortedList — для быстрого top-K. Каждая операция O(log n) — приемлемо для большинства задач.
В Redis Sorted Set (ZSET) реализует ровно это: skip list + hash для O(log n) update и O(log n + k) top-K. Используется во всех leaderboard-системах.
Когда вы видите задачу ‘найти k лучших из большого набора’ — первый рефлекс должен быть heapq.nlargest. Полная сортировка тратит время на 99 percent элементов, которые вам не нужны. Это самая частая микро-оптимизация в дата-инженерии.
Min-heap vs max-heap
Python heapq — это min-heap: корень — минимум. Поэтому top-K наибольших реализуется через min-heap размера k (root = самый маленький из k наибольших).
Если нужен max-heap (для top-K наименьших или явного top() — самый большой):
# трюк: храним отрицательные значения
heap = []
heapq.heappush(heap, -x) # вставка отрицательного
max_value = -heapq.heappop(heap)
Или через тупл с инверсией ключа:
heapq.heappush(heap, (-score, item)) # отрицательный score
heapq.nlargest уже учитывает это внутри.
Замеры
Сравним sorted vs heapq.nlargest на 1 миллионе:
import random
import time
import heapq
n = 1_000_000
items = [random.randint(0, n) for _ in range(n)]
k = 100
t0 = time.time()
top_sort = sorted(items, reverse=True)[:k]
t_sort = time.time() - t0
t0 = time.time()
top_heap = heapq.nlargest(k, items)
t_heap = time.time() - t0
assert top_sort == top_heap
print(f"sorted: {t_sort*1000:.1f} ms")
print(f"nlargest: {t_heap*1000:.1f} ms")
print(f"speedup: {t_sort/t_heap:.1f}x")
Типичный результат: sorted ~150 мс, nlargest ~80 мс. Ускорение около 2x. На k=10 разница ещё больше.
Попробуй сам
Streaming top-K с предметной задачей: «топ-10 IP-адресов по числу запросов в логе»:
from collections import Counter
import heapq
import random
# имитация log: 100k запросов
log = [f"10.0.{random.randint(0,255)}.{random.randint(0,255)}" for _ in range(100_000)]
# через Counter
counts = Counter(log)
top10 = counts.most_common(10)
for ip, count in top10:
print(f"{ip}: {count} requests")
Это работает за миллисекунды. Замените источник на потоковый (генератор по файлу), и получите top-K без загрузки всего лога в память.
Merge join и sort: когда PostgreSQL выбирает MIN-heap для top-K