Learning Platform
Глоссарий Troubleshooting
Урок 12.05 · 30 мин
Начальный
priority queuetop-KDijkstraexternal sortk-way merge

Где встречается heap в реальной DE-работе

Heap (priority queue) — не теоретический инструмент из учебника. Это рабочая лошадка в нескольких сценариях, с которыми вы встретитесь на любой DE-должности:

  1. Top-K из stream: real-time analytics, top users/events/products.
  2. Dijkstra и shortest path: маршрутизация, дерево зависимостей задач.
  3. Task scheduling: cron, job queue, fair scheduling.
  4. K-way merge: external sort, объединение отсортированных файлов.

В этом уроке разберём каждое применение с реальным кодом.

Top-K из stream

Самая частая задача в DE-стриминге. Нужно поддерживать топ-K самых популярных элементов из потока в миллиарды записей.

import heapq

class TopKStreaming:
    def __init__(self, k):
        self.k = k
        self.heap = []   # min-heap размера ≤ k
    
    def add(self, value):
        if len(self.heap) < self.k:
            heapq.heappush(self.heap, value)
        elif value > self.heap[0]:
            heapq.heapreplace(self.heap, value)
    
    def get_top(self):
        return sorted(self.heap, reverse=True)

# использование
top = TopKStreaming(k=100)
for event in stream:
    top.add(event.score)

print(top.get_top())

Сложность: O(N log K) по времени, O(K) по памяти. Для N=1B и K=100: ~7N операций, ~30-60 секунд. Память — 100 элементов.

Top-K по составному критерию

В реальных задачах сортировка не по простому числу, а по композитному ключу:

import heapq

# топ-100 событий по score, при равенстве — по более новому timestamp
class Event:
    def __init__(self, event_id, score, timestamp):
        self.event_id = event_id
        self.score = score
        self.timestamp = timestamp

events_heap = []   # heap of (score, timestamp, event_id, event)

def add_event(e):
    key = (e.score, e.timestamp)
    if len(events_heap) < 100:
        heapq.heappush(events_heap, (key, e.event_id, e))
    elif key > events_heap[0][0]:
        heapq.heapreplace(events_heap, (key, e.event_id, e))

Заметьте: event_id добавлен между key и event как tie-breaker, чтобы избежать сравнения Event объектов при равенстве score и timestamp.

Dijkstra: shortest path

Алгоритм Dijkstra находит кратчайший путь от одной вершины графа до всех остальных. Использует priority queue по «текущей минимальной дистанции».

import heapq
from collections import defaultdict

def dijkstra(graph, start):
    """
    graph: {node: [(neighbor, weight), ...]}
    Возвращает {node: min_distance}.
    """
    distances = {start: 0}
    heap = [(0, start)]
    
    while heap:
        d, node = heapq.heappop(heap)
        if d > distances.get(node, float('inf')):
            continue   # уже нашли путь короче
        for neighbor, weight in graph[node]:
            new_dist = d + weight
            if new_dist < distances.get(neighbor, float('inf')):
                distances[neighbor] = new_dist
                heapq.heappush(heap, (new_dist, neighbor))
    
    return distances

Применения в DE:

  • Маршрутизация в системах доставки.
  • Дерево зависимостей DAG-задач в Airflow: «какая задача готова к выполнению».
  • Network topology в распределённых системах.
  • Shortest path в графах знаний.

Сложность Dijkstra с heap: O((V + E) log V) — где V вершины, E рёбра. Без heap (наивно искать минимум) было бы O(V²). Для разреженных графов heap намного быстрее.

Task scheduling

Cron-like планировщик задач с приоритетами:

import heapq
import itertools
import time

class TaskScheduler:
    def __init__(self):
        self.heap = []
        self._counter = itertools.count()   # tie-breaker
    
    def schedule(self, run_at, priority, task):
        """Запланировать task на run_at с приоритетом."""
        # сортируем сначала по времени, потом по приоритету
        entry = (run_at, priority, next(self._counter), task)
        heapq.heappush(self.heap, entry)
    
    def run_next(self):
        """Запускает следующую готовую к выполнению задачу."""
        if not self.heap:
            return None
        run_at, priority, _, task = self.heap[0]
        if run_at > time.time():
            return None   # ещё не пора
        heapq.heappop(self.heap)
        # выполнить task
        return task

scheduler = TaskScheduler()
scheduler.schedule(time.time() + 60, priority=1, task='send_email')
scheduler.schedule(time.time() + 10, priority=2, task='backup')
scheduler.schedule(time.time() + 30, priority=1, task='cleanup')

# main loop
while True:
    task = scheduler.run_next()
    if task:
        print(f"Running: {task}")
    time.sleep(1)

Это упрощённая модель real scheduler’ов вроде:

  • Apache Airflow scheduler: heap задач по execution_time.
  • Celery: priority queue для задач.
  • Linux scheduler: CFS (Completely Fair Scheduler) использует red-black tree, но идея та же.

K-way merge для external sort

Это, пожалуй, самое мощное применение heap в DE.

Задача: отсортировать файл размером 100 GB при 8 GB RAM.

Решение — external sort через k-way merge:

  1. Разбить файл на куски размером ~7 GB (помещающиеся в RAM).
  2. Отсортировать каждый кусок в RAM (стандартный sort) и записать на диск.
  3. Слить все куски через k-way merge с heap’ом.
import heapq

def merge_sorted_files(*filenames, output):
    """Сливает N отсортированных файлов в один отсортированный output."""
    files = [open(f) for f in filenames]
    
    with open(output, 'w') as out:
        # heapq.merge принимает iterators, возвращает iterator
        for line in heapq.merge(*files):
            out.write(line)
    
    for f in files:
        f.close()

# external sort
def external_sort(input_file, output_file, chunk_size=1_000_000):
    """Сортировка файла, не помещающегося в RAM."""
    # шаг 1-2: разбить и отсортировать
    chunk_files = []
    with open(input_file) as f:
        chunk = []
        chunk_num = 0
        for line in f:
            chunk.append(line)
            if len(chunk) >= chunk_size:
                chunk.sort()
                chunk_filename = f'chunk_{chunk_num}.txt'
                with open(chunk_filename, 'w') as cf:
                    cf.writelines(chunk)
                chunk_files.append(chunk_filename)
                chunk_num += 1
                chunk = []
        if chunk:
            chunk.sort()
            chunk_filename = f'chunk_{chunk_num}.txt'
            with open(chunk_filename, 'w') as cf:
                cf.writelines(chunk)
            chunk_files.append(chunk_filename)
    
    # шаг 3: k-way merge
    merge_sorted_files(*chunk_files, output=output_file)
    
    # cleanup
    import os
    for f in chunk_files:
        os.remove(f)

Сложность: O(N log N) total — sort кусков + merge, что эквивалентно одному большому sort, но с фиксированной памятью O(chunk_size).

heapq.merge — это generator: он не загружает все данные в RAM, читает по одному элементу из каждого файла, выдаёт минимум, продвигает соответствующий файл. Constant memory O(K) — только K «текущих голов».

Этот алгоритм работает в basis всех больших sort’ов: PostgreSQL ORDER BY на больших таблицах, Hadoop MapReduce shuffle, Apache Spark sort, Snowflake compaction. Везде heap по «головам».

Top-K за временное окно

Sliding window top-K — частая задача:

«Найди топ-100 самых популярных событий за последние 60 секунд».

Это сложнее, чем общий top-K, потому что элементы истекают:

import heapq
import time
from collections import deque

class SlidingTopK:
    def __init__(self, k, window_seconds):
        self.k = k
        self.window = window_seconds
        # deque всех событий с timestamp'ами
        self.events = deque()
        # heap для top-K
        # элементы: (-score, event_id, timestamp)
        # negate score, потому что heapq это min-heap, нам надо max
    
    def add(self, event_id, score):
        now = time.time()
        self._expire(now)
        self.events.append((event_id, score, now))
    
    def _expire(self, now):
        # убрать события старше window
        while self.events and self.events[0][2] < now - self.window:
            self.events.popleft()
    
    def get_top(self):
        self._expire(time.time())
        # heapq.nlargest сортирует по второму элементу tuple
        return heapq.nlargest(self.k, self.events, key=lambda e: e[1])

window = SlidingTopK(k=10, window_seconds=60)
window.add('event_1', 100)
window.add('event_2', 50)
window.add('event_3', 75)

print(window.get_top())

Это уже сложнее, потому что heap не поддерживает «удалить произвольный элемент». Реальные production-системы используют либо:

  • Lazy expiration: помещать в heap, проверять при extract, что не expired.
  • Count-Min Sketch + heap: probabilistic top-K, гораздо быстрее, но approximate.
  • Stream-summary: специальная структура для exact top-K в окне.

Применения в реальной DE-стек

Где встречается priority queue в DE-системах

От real-time analytics до сортировки терабайтов.

DE-стек
Spark/Hadoop sortexternal sort через k-way mergeразбивают на partitions, сортируют, merge через heap
Kafka Streamswatermark heap по времениотслеживание progress через heap timestamp'ов
Airflowtask scheduling по execution_timeheap задач
PostgreSQLORDER BY ... LIMIT Nкогда N << total, использует top-K через heap вместо full sort
Redis SortedSetне heap, BST-подобная структурадля general range queries
Cassandra/RocksDBLSM-tree compactionmerge sorted SSTables через heap

Финальный пример: топ-100 событий за окно

Допустим, у вас сервис аналитики, который должен показывать «топ-100 самых популярных страниц за последний час». Поток событий — 10К/секунду = 36М/час.

import heapq
from collections import deque, Counter
import time

class WindowedTopPages:
    def __init__(self, k=100, window_seconds=3600):
        self.k = k
        self.window = window_seconds
        # все события в окне с timestamp'ами
        self.events = deque()      # (page_id, timestamp)
        # счётчик частот для всех page_id в окне
        self.counter = Counter()
    
    def add(self, page_id):
        now = time.time()
        self._expire(now)
        self.events.append((page_id, now))
        self.counter[page_id] += 1
    
    def _expire(self, now):
        while self.events and self.events[0][1] < now - self.window:
            old_id, _ = self.events.popleft()
            self.counter[old_id] -= 1
            if self.counter[old_id] == 0:
                del self.counter[old_id]
    
    def get_top(self):
        self._expire(time.time())
        # heapq.nlargest — это O(N log K) for N total unique pages
        return self.counter.most_common(self.k)

# использование
analytics = WindowedTopPages(k=100, window_seconds=3600)

# симулируем поток
import random
for _ in range(1_000_000):
    page = f"page_{random.randint(1, 10_000)}"
    analytics.add(page)

print(f"Top 10: {analytics.get_top()[:10]}")

Memory: O(window_size) events + O(unique pages) counter. На 36M событий и 100К уникальных страниц — ~36M × 50 байт + 100К × 100 байт ~ 1.8 GB.

Counter.most_common(k) внутри использует heap (heapq.nlargest по значениям счётчика) — O(N log K), не O(N log N) как полная сортировка.

Попробуй сам

import heapq
import time
import random

# 1. Top-K с min-heap (классический паттерн)
def top_k(data, k):
    heap = []
    for x in data:
        if len(heap) < k:
            heapq.heappush(heap, x)
        elif x > heap[0]:
            heapq.heapreplace(heap, x)
    return sorted(heap, reverse=True)

# сравните с heapq.nlargest
data = [random.random() for _ in range(1_000_000)]

start = time.perf_counter()
top1 = top_k(data, 100)
t1 = time.perf_counter() - start

start = time.perf_counter()
top2 = heapq.nlargest(100, data)
t2 = time.perf_counter() - start

print(f"manual top_k: {t1*1000:.0f} ms")
print(f"nlargest:     {t2*1000:.0f} ms")
print(f"results equal: {top1 == top2}")

# 2. K-way merge нескольких отсортированных потоков
streams = [
    sorted(random.sample(range(100), 20)) for _ in range(5)
]
print(f"\n5 streams of 20 elements each:")
for i, s in enumerate(streams):
    print(f"  stream {i}: {s[:5]}... {s[-3:]}")

merged = list(heapq.merge(*streams))
print(f"Merged ({len(merged)} elements): {merged[:10]}...")
print(f"Is sorted: {merged == sorted(merged)}")

# 3. Task scheduler с priority и tie-breaker
import itertools

class TaskQueue:
    def __init__(self):
        self.heap = []
        self.counter = itertools.count()
    
    def add(self, priority, task):
        heapq.heappush(self.heap, (priority, next(self.counter), task))
    
    def next(self):
        if not self.heap:
            return None
        p, _, t = heapq.heappop(self.heap)
        return p, t

q = TaskQueue()
q.add(3, "low_priority_task")
q.add(1, "urgent_task")
q.add(1, "another_urgent")
q.add(2, "medium_task")

while True:
    n = q.next()
    if n is None:
        break
    print(f"Run priority {n[0]}: {n[1]}")
NOTE

Это завершает модуль про heap. Запомните три главных DE-паттерна: (1) Top-K из stream — min-heap размера K, heappush/heapreplace; (2) K-way merge — heapq.merge для external sort, O(K) memory; (3) Task scheduling — heap по (run_at, priority, tie_breaker, task). Все используют один и тот же heapq, но в разных конфигурациях. Если видите задачу “найти минимум/максимум/top-K из стрима” — это heap.

В следующем модуле — графы: представления (adjacency list, matrix), basic операции, и связь с деревьями.

Merge Join: zip двух отсортированных потоков
Проверка знанийKnowledge check
Спарк сортирует терабайт данных при ограниченной RAM на каждом воркере. Как используется heap (priority queue) в этом процессе?
ОтветAnswer
Spark использует k-way merge через heap в нескольких местах. (1) Каждый воркер берёт свою partition (например, 10 GB при 8 GB RAM), сортирует чанки в RAM по 6 GB каждый, выписывает их на диск как отсортированные spill-файлы. (2) После всех spill'ов запускается k-way merge: heap из K=count_of_spills элементов, каждый элемент — текущая строка из соответствующего spill-файла. heap[0] — глобально-минимум среди голов всех spill'ов; pop, записать в финальный output, advance соответствующий spill. Сложность O(N log K) — каждая строка проходит через одну heap-операцию. Память O(K) — только K буферов чтения, не O(N). (3) В shuffle stage то же самое: каждый reducer получает sorted chunks из всех mappers, делает k-way merge. Это позволяет сортировать терабайт при 8 GB RAM — spill+merge не требует всех данных в RAM. heapq в Python и реализации в Spark/Hadoop сделаны на одном принципе: priority queue по «головам» отсортированных потоков. Это и есть причина, почему heap — fundamental структура в DE: ABсолютно все sort/aggregate операции в распределённых системах опираются на k-way merge.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Как реализуется external sort терабайтного файла через heap?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5