Где встречается heap в реальной DE-работе
Heap (priority queue) — не теоретический инструмент из учебника. Это рабочая лошадка в нескольких сценариях, с которыми вы встретитесь на любой DE-должности:
- Top-K из stream: real-time analytics, top users/events/products.
- Dijkstra и shortest path: маршрутизация, дерево зависимостей задач.
- Task scheduling: cron, job queue, fair scheduling.
- K-way merge: external sort, объединение отсортированных файлов.
В этом уроке разберём каждое применение с реальным кодом.
Top-K из stream
Самая частая задача в DE-стриминге. Нужно поддерживать топ-K самых популярных элементов из потока в миллиарды записей.
import heapq
class TopKStreaming:
def __init__(self, k):
self.k = k
self.heap = [] # min-heap размера ≤ k
def add(self, value):
if len(self.heap) < self.k:
heapq.heappush(self.heap, value)
elif value > self.heap[0]:
heapq.heapreplace(self.heap, value)
def get_top(self):
return sorted(self.heap, reverse=True)
# использование
top = TopKStreaming(k=100)
for event in stream:
top.add(event.score)
print(top.get_top())
Сложность: O(N log K) по времени, O(K) по памяти. Для N=1B и K=100: ~7N операций, ~30-60 секунд. Память — 100 элементов.
Top-K по составному критерию
В реальных задачах сортировка не по простому числу, а по композитному ключу:
import heapq
# топ-100 событий по score, при равенстве — по более новому timestamp
class Event:
def __init__(self, event_id, score, timestamp):
self.event_id = event_id
self.score = score
self.timestamp = timestamp
events_heap = [] # heap of (score, timestamp, event_id, event)
def add_event(e):
key = (e.score, e.timestamp)
if len(events_heap) < 100:
heapq.heappush(events_heap, (key, e.event_id, e))
elif key > events_heap[0][0]:
heapq.heapreplace(events_heap, (key, e.event_id, e))
Заметьте: event_id добавлен между key и event как tie-breaker, чтобы избежать сравнения Event объектов при равенстве score и timestamp.
Dijkstra: shortest path
Алгоритм Dijkstra находит кратчайший путь от одной вершины графа до всех остальных. Использует priority queue по «текущей минимальной дистанции».
import heapq
from collections import defaultdict
def dijkstra(graph, start):
"""
graph: {node: [(neighbor, weight), ...]}
Возвращает {node: min_distance}.
"""
distances = {start: 0}
heap = [(0, start)]
while heap:
d, node = heapq.heappop(heap)
if d > distances.get(node, float('inf')):
continue # уже нашли путь короче
for neighbor, weight in graph[node]:
new_dist = d + weight
if new_dist < distances.get(neighbor, float('inf')):
distances[neighbor] = new_dist
heapq.heappush(heap, (new_dist, neighbor))
return distances
Применения в DE:
- Маршрутизация в системах доставки.
- Дерево зависимостей DAG-задач в Airflow: «какая задача готова к выполнению».
- Network topology в распределённых системах.
- Shortest path в графах знаний.
Сложность Dijkstra с heap: O((V + E) log V) — где V вершины, E рёбра. Без heap (наивно искать минимум) было бы O(V²). Для разреженных графов heap намного быстрее.
Task scheduling
Cron-like планировщик задач с приоритетами:
import heapq
import itertools
import time
class TaskScheduler:
def __init__(self):
self.heap = []
self._counter = itertools.count() # tie-breaker
def schedule(self, run_at, priority, task):
"""Запланировать task на run_at с приоритетом."""
# сортируем сначала по времени, потом по приоритету
entry = (run_at, priority, next(self._counter), task)
heapq.heappush(self.heap, entry)
def run_next(self):
"""Запускает следующую готовую к выполнению задачу."""
if not self.heap:
return None
run_at, priority, _, task = self.heap[0]
if run_at > time.time():
return None # ещё не пора
heapq.heappop(self.heap)
# выполнить task
return task
scheduler = TaskScheduler()
scheduler.schedule(time.time() + 60, priority=1, task='send_email')
scheduler.schedule(time.time() + 10, priority=2, task='backup')
scheduler.schedule(time.time() + 30, priority=1, task='cleanup')
# main loop
while True:
task = scheduler.run_next()
if task:
print(f"Running: {task}")
time.sleep(1)
Это упрощённая модель real scheduler’ов вроде:
- Apache Airflow scheduler: heap задач по execution_time.
- Celery: priority queue для задач.
- Linux scheduler: CFS (Completely Fair Scheduler) использует red-black tree, но идея та же.
K-way merge для external sort
Это, пожалуй, самое мощное применение heap в DE.
Задача: отсортировать файл размером 100 GB при 8 GB RAM.
Решение — external sort через k-way merge:
- Разбить файл на куски размером ~7 GB (помещающиеся в RAM).
- Отсортировать каждый кусок в RAM (стандартный sort) и записать на диск.
- Слить все куски через k-way merge с heap’ом.
import heapq
def merge_sorted_files(*filenames, output):
"""Сливает N отсортированных файлов в один отсортированный output."""
files = [open(f) for f in filenames]
with open(output, 'w') as out:
# heapq.merge принимает iterators, возвращает iterator
for line in heapq.merge(*files):
out.write(line)
for f in files:
f.close()
# external sort
def external_sort(input_file, output_file, chunk_size=1_000_000):
"""Сортировка файла, не помещающегося в RAM."""
# шаг 1-2: разбить и отсортировать
chunk_files = []
with open(input_file) as f:
chunk = []
chunk_num = 0
for line in f:
chunk.append(line)
if len(chunk) >= chunk_size:
chunk.sort()
chunk_filename = f'chunk_{chunk_num}.txt'
with open(chunk_filename, 'w') as cf:
cf.writelines(chunk)
chunk_files.append(chunk_filename)
chunk_num += 1
chunk = []
if chunk:
chunk.sort()
chunk_filename = f'chunk_{chunk_num}.txt'
with open(chunk_filename, 'w') as cf:
cf.writelines(chunk)
chunk_files.append(chunk_filename)
# шаг 3: k-way merge
merge_sorted_files(*chunk_files, output=output_file)
# cleanup
import os
for f in chunk_files:
os.remove(f)
Сложность: O(N log N) total — sort кусков + merge, что эквивалентно одному большому sort, но с фиксированной памятью O(chunk_size).
heapq.merge — это generator: он не загружает все данные в RAM, читает по одному элементу из каждого файла, выдаёт минимум, продвигает соответствующий файл. Constant memory O(K) — только K «текущих голов».
Этот алгоритм работает в basis всех больших sort’ов: PostgreSQL ORDER BY на больших таблицах, Hadoop MapReduce shuffle, Apache Spark sort, Snowflake compaction. Везде heap по «головам».
Top-K за временное окно
Sliding window top-K — частая задача:
«Найди топ-100 самых популярных событий за последние 60 секунд».
Это сложнее, чем общий top-K, потому что элементы истекают:
import heapq
import time
from collections import deque
class SlidingTopK:
def __init__(self, k, window_seconds):
self.k = k
self.window = window_seconds
# deque всех событий с timestamp'ами
self.events = deque()
# heap для top-K
# элементы: (-score, event_id, timestamp)
# negate score, потому что heapq это min-heap, нам надо max
def add(self, event_id, score):
now = time.time()
self._expire(now)
self.events.append((event_id, score, now))
def _expire(self, now):
# убрать события старше window
while self.events and self.events[0][2] < now - self.window:
self.events.popleft()
def get_top(self):
self._expire(time.time())
# heapq.nlargest сортирует по второму элементу tuple
return heapq.nlargest(self.k, self.events, key=lambda e: e[1])
window = SlidingTopK(k=10, window_seconds=60)
window.add('event_1', 100)
window.add('event_2', 50)
window.add('event_3', 75)
print(window.get_top())
Это уже сложнее, потому что heap не поддерживает «удалить произвольный элемент». Реальные production-системы используют либо:
- Lazy expiration: помещать в heap, проверять при extract, что не expired.
- Count-Min Sketch + heap: probabilistic top-K, гораздо быстрее, но approximate.
- Stream-summary: специальная структура для exact top-K в окне.
Применения в реальной DE-стек
От real-time analytics до сортировки терабайтов.
Финальный пример: топ-100 событий за окно
Допустим, у вас сервис аналитики, который должен показывать «топ-100 самых популярных страниц за последний час». Поток событий — 10К/секунду = 36М/час.
import heapq
from collections import deque, Counter
import time
class WindowedTopPages:
def __init__(self, k=100, window_seconds=3600):
self.k = k
self.window = window_seconds
# все события в окне с timestamp'ами
self.events = deque() # (page_id, timestamp)
# счётчик частот для всех page_id в окне
self.counter = Counter()
def add(self, page_id):
now = time.time()
self._expire(now)
self.events.append((page_id, now))
self.counter[page_id] += 1
def _expire(self, now):
while self.events and self.events[0][1] < now - self.window:
old_id, _ = self.events.popleft()
self.counter[old_id] -= 1
if self.counter[old_id] == 0:
del self.counter[old_id]
def get_top(self):
self._expire(time.time())
# heapq.nlargest — это O(N log K) for N total unique pages
return self.counter.most_common(self.k)
# использование
analytics = WindowedTopPages(k=100, window_seconds=3600)
# симулируем поток
import random
for _ in range(1_000_000):
page = f"page_{random.randint(1, 10_000)}"
analytics.add(page)
print(f"Top 10: {analytics.get_top()[:10]}")
Memory: O(window_size) events + O(unique pages) counter. На 36M событий и 100К уникальных страниц — ~36M × 50 байт + 100К × 100 байт ~ 1.8 GB.
Counter.most_common(k) внутри использует heap (heapq.nlargest по значениям счётчика) — O(N log K), не O(N log N) как полная сортировка.
Попробуй сам
import heapq
import time
import random
# 1. Top-K с min-heap (классический паттерн)
def top_k(data, k):
heap = []
for x in data:
if len(heap) < k:
heapq.heappush(heap, x)
elif x > heap[0]:
heapq.heapreplace(heap, x)
return sorted(heap, reverse=True)
# сравните с heapq.nlargest
data = [random.random() for _ in range(1_000_000)]
start = time.perf_counter()
top1 = top_k(data, 100)
t1 = time.perf_counter() - start
start = time.perf_counter()
top2 = heapq.nlargest(100, data)
t2 = time.perf_counter() - start
print(f"manual top_k: {t1*1000:.0f} ms")
print(f"nlargest: {t2*1000:.0f} ms")
print(f"results equal: {top1 == top2}")
# 2. K-way merge нескольких отсортированных потоков
streams = [
sorted(random.sample(range(100), 20)) for _ in range(5)
]
print(f"\n5 streams of 20 elements each:")
for i, s in enumerate(streams):
print(f" stream {i}: {s[:5]}... {s[-3:]}")
merged = list(heapq.merge(*streams))
print(f"Merged ({len(merged)} elements): {merged[:10]}...")
print(f"Is sorted: {merged == sorted(merged)}")
# 3. Task scheduler с priority и tie-breaker
import itertools
class TaskQueue:
def __init__(self):
self.heap = []
self.counter = itertools.count()
def add(self, priority, task):
heapq.heappush(self.heap, (priority, next(self.counter), task))
def next(self):
if not self.heap:
return None
p, _, t = heapq.heappop(self.heap)
return p, t
q = TaskQueue()
q.add(3, "low_priority_task")
q.add(1, "urgent_task")
q.add(1, "another_urgent")
q.add(2, "medium_task")
while True:
n = q.next()
if n is None:
break
print(f"Run priority {n[0]}: {n[1]}")
Это завершает модуль про heap. Запомните три главных DE-паттерна: (1) Top-K из stream — min-heap размера K, heappush/heapreplace; (2) K-way merge — heapq.merge для external sort, O(K) memory; (3) Task scheduling — heap по (run_at, priority, tie_breaker, task). Все используют один и тот же heapq, но в разных конфигурациях. Если видите задачу “найти минимум/максимум/top-K из стрима” — это heap.
В следующем модуле — графы: представления (adjacency list, matrix), basic операции, и связь с деревьями.
Merge Join: zip двух отсортированных потоков