Learning Platform
Глоссарий Troubleshooting
Урок 19.05 · 25 мин
Начальный
k-way-mergeexternal-sortspark-shuffleheapq-mergelog-aggregation

Зачем нужно сливать K потоков

Реальные задачи:

  1. External sort. Файл 100 ГБ не помещается в RAM. Делим на чанки по 1 ГБ, сортируем каждый в памяти, записываем на диск. Получили 100 отсортированных файлов. Нужно слить их в один отсортированный.
  2. Log aggregation. Сервис пишет логи в разные файлы (по дням или по серверам). Все отсортированы по timestamp. Нужно объединить в один поток для анализа.
  3. Spark shuffle. При groupBy / join Spark делит данные на partitions, сортирует, потом сливает по ключу.
  4. Time-series merge. Несколько метрик с timestamp нужно объединить в один поток для совместного анализа.

Задача формулируется одинаково: даны K отсортированных потоков, объединить их в один отсортированный поток.

Наивное решение

Простейший подход — собрать всё в список и пересортировать:

def merge_naive(streams):
    result = []
    for s in streams:
        result.extend(s)
    result.sort()
    return result

O(n log n), где n — общее число элементов. Игнорирует факт, что потоки уже отсортированы. На больших данных не работает — требует хранить всё в памяти, плюс пересортировка.

Heap-based merge

Правильное решение — использовать min-heap размера K. На каждом шаге берём минимум среди голов всех потоков:

import heapq

def merge_k_streams(streams):
    """streams — список итераторов отсортированных данных."""
    heap = []

    # инициализируем heap первым элементом каждого потока
    for i, stream in enumerate(streams):
        it = iter(stream)
        try:
            value = next(it)
            heap.append((value, i, it))
        except StopIteration:
            pass
    heapq.heapify(heap)

    # пока есть данные в heap
    while heap:
        value, i, it = heapq.heappop(heap)
        yield value
        try:
            next_value = next(it)
            heapq.heappush(heap, (next_value, i, it))
        except StopIteration:
            pass

Сложность: каждое значение проходит через push + pop heap = O(log K). Всего n значений, общая сложность O(n log K). Память O(K) — храним только по одному значению из каждого потока.

K-way merge через min-heap

Heap всегда содержит головы всех потоков. На каждом шаге берём минимум, продвигаем нужный поток.

stream 1[1, 4, 7, 10]отсортированный поток A
stream 2[2, 5, 8, 11]отсортированный поток B
stream 3[3, 6, 9, 12]отсортированный поток C
heap старт[1, 2, 3]по одному из каждого потока
pop1минимум, выводим
heap[2, 4, 3]взяли следующий из stream 1
pop2следующий минимум
heap[3, 4, 5]взяли из stream 2
pop3и так далее

heapq.merge — готовое решение

В Python есть встроенная функция:

import heapq

a = [1, 4, 7]
b = [2, 5, 8]
c = [3, 6, 9]

merged = list(heapq.merge(a, b, c))
print(merged)   # [1, 2, 3, 4, 5, 6, 7, 8, 9]

heapq.merge принимает несколько итераторов и возвращает генератор отсортированного потока. Работает потоково — память O(K). Поддерживает key и reverse.

С файлами:

import heapq

def file_stream(path):
    with open(path) as f:
        for line in f:
            yield int(line.strip())

streams = [file_stream(f"sorted_chunk_{i}.txt") for i in range(100)]
with open("merged.txt", "w") as out:
    for value in heapq.merge(*streams):
        out.write(f"{value}\n")

Это сливает 100 отсортированных файлов любого размера за O(n log 100) времени и O(100) памяти. На 100 ГБ данных — ровно так работает external sort в production.

External sort: полный алгоритм

Полный алгоритм сортировки данных, не помещающихся в RAM:

import heapq
import os

def external_sort(input_path, output_path, chunk_size_lines=1_000_000):
    # шаг 1: разделить и отсортировать чанки в RAM
    chunks = []
    with open(input_path) as f:
        chunk = []
        for line in f:
            chunk.append(int(line.strip()))
            if len(chunk) >= chunk_size_lines:
                chunk.sort()
                path = f"chunk_{len(chunks)}.tmp"
                with open(path, "w") as out:
                    for x in chunk:
                        out.write(f"{x}\n")
                chunks.append(path)
                chunk = []
        if chunk:
            chunk.sort()
            path = f"chunk_{len(chunks)}.tmp"
            with open(path, "w") as out:
                for x in chunk:
                    out.write(f"{x}\n")
            chunks.append(path)

    # шаг 2: K-way merge
    streams = []
    for path in chunks:
        f = open(path)
        streams.append((int(line.strip()) for line in f))

    with open(output_path, "w") as out:
        for value in heapq.merge(*streams):
            out.write(f"{value}\n")

    # шаг 3: cleanup
    for path in chunks:
        os.remove(path)

Это классика. Используется в реальных СУБД (PostgreSQL, MySQL), в Hadoop MapReduce, в Spark.

Сложность

операциявремяпамять
разделить и сортировать чанкиO(n log m), m = chunk sizeO(m)
K-way mergeO(n log K), K = число чанковO(K)
общаяO(n log n)O(max(m, K))

На 100 ГБ с чанками по 1 ГБ: 100 чанков, по 10^9 элементов в каждом. Время O(n log n) = O(10^11 * 30) операций — десятки минут CPU. Память O(10^9) на чанк + O(100) на merge — RAM 8-16 ГБ достаточно.

Spark shuffle: как это работает в реальной системе

Когда вы делаете .groupBy("key").agg(...) в Spark, происходит shuffle:

  1. Map phase. Каждый worker разделяет свои данные на partitions по ключу (hash partitioner).
  2. Shuffle write. Worker сортирует свои partition’ы по ключу и пишет на диск.
  3. Shuffle read. Reducer-worker подтягивает свои partition’ы со всех map-workers.
  4. Merge. Reducer делает K-way merge подтянутых файлов по ключу, потом агрегирует.

Это тот же алгоритм, что мы только что разобрали, просто distributed. heapq.merge -> org.apache.spark.util.collection.ExternalSorter.

Когда merge не подходит

  1. Потоки не отсортированы. K-way merge требует отсортированных входов. Если потоки не отсортированы, нужно сначала отсортировать (увеличивает сложность).
  2. Очень много потоков. При K=10000 log K = 14, но overhead на 10000 file descriptors / connections может быть критичным. Решение — двухуровневый merge: сначала merge 100 потоков по 100, потом merge 100 «промежуточных».
  3. Дубликаты с дедупликацией. Если нужна дедупликация при merge — нужно отдельно обрабатывать (например, при равенстве выводить только один).

Замеры

Сравним merge через heapq и через sort:

import heapq
import time
import random

# 10 отсортированных потоков по 100к элементов
streams = []
for i in range(10):
    s = sorted(random.randint(0, 10**9) for _ in range(100_000))
    streams.append(s)

n = sum(len(s) for s in streams)
print(f"total elements: {n}")

# через heapq.merge
t0 = time.time()
merged = list(heapq.merge(*streams))
t_heap = time.time() - t0
print(f"heapq.merge: {t_heap:.2f} s")

# через concat + sort
all_data = []
for s in streams:
    all_data.extend(s)
t0 = time.time()
all_data.sort()
t_sort = time.time() - t0
print(f"concat + sort: {t_sort:.2f} s")

Типичный результат: heapq.merge ~3 секунды, concat + sort ~5 секунд. Heap merge быстрее, и память O(K) против O(n). На потоковых данных heap merge — единственный вариант.

TIP

Если вы пишете ETL, который читает данные из нескольких отсортированных источников (например, partitioned Parquet с time-based key) — heapq.merge даст вам потоковую обработку с минимальной памятью. Это самый недооценённый инструмент в стандартной библиотеке Python.

Попробуй сам

Объедините несколько отсортированных файлов логов:

import heapq
import tempfile
import os

# создадим три файла с отсортированными timestamps
files = []
for chunk in [[1, 4, 7, 10], [2, 5, 8, 11], [3, 6, 9, 12]]:
    fd, path = tempfile.mkstemp(suffix=".log")
    os.close(fd)
    with open(path, "w") as f:
        for x in chunk:
            f.write(f"{x}\n")
    files.append(path)

# объединяем потоково
def file_ints(path):
    with open(path) as f:
        for line in f:
            yield int(line.strip())

streams = [file_ints(p) for p in files]
merged = list(heapq.merge(*streams))
print(merged)   # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

# cleanup
for p in files:
    os.remove(p)

Это работает на файлах любого размера, не загружая их в память.

Merge join в PostgreSQL: K-way merge как основа соединения таблиц yield from: делегирование в генераторах Python
Проверка знанийKnowledge check
Объясните, как Spark shuffle при groupBy использует K-way merge, и почему общая сложность операций O(n log n) даже при distributed обработке миллиардов строк. Какое преимущество heap-merge даёт перед наивным 'collect и sort'?
ОтветAnswer
Spark shuffle разделяет данные на partitions по ключу (hash partitioner). Каждый worker сортирует свои partition'ы локально (O(m log m) где m — размер partition). Потом reducer подтягивает все partition'ы со всех workers и делает K-way merge через heap, объединяя их в один отсортированный поток по ключу (O(n log K)). Локальная сортировка n записей разделённых на K partition'ов: K * O(m log m) = O(n log m). Merge: O(n log K). Итого O(n * (log m + log K)) = O(n * log(m*K)) = O(n log n). Главное преимущество heap-merge над 'collect и sort': 1) Потоковая обработка — не нужно держать все n записей в одной памяти, только K голов потоков. На миллиарде строк это разница между OOM и 'работает'. 2) Параллелизуемость — каждый worker сортирует свою долю независимо, finalmerge может быть распределён. 3) Стримминговый вывод — результаты merge можно сразу писать в downstream-агрегацию, не дожидаясь полного результата. Это и есть основа всех distributed sort алгоритмов (Spark, Hadoop, BigQuery shuffle).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. K-way merge через min-heap имеет сложность O(n log K). Почему не O(n log n) как обычная сортировка?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5