Зачем нужно сливать K потоков
Реальные задачи:
- External sort. Файл 100 ГБ не помещается в RAM. Делим на чанки по 1 ГБ, сортируем каждый в памяти, записываем на диск. Получили 100 отсортированных файлов. Нужно слить их в один отсортированный.
- Log aggregation. Сервис пишет логи в разные файлы (по дням или по серверам). Все отсортированы по timestamp. Нужно объединить в один поток для анализа.
- Spark shuffle. При groupBy / join Spark делит данные на partitions, сортирует, потом сливает по ключу.
- Time-series merge. Несколько метрик с timestamp нужно объединить в один поток для совместного анализа.
Задача формулируется одинаково: даны K отсортированных потоков, объединить их в один отсортированный поток.
Наивное решение
Простейший подход — собрать всё в список и пересортировать:
def merge_naive(streams):
result = []
for s in streams:
result.extend(s)
result.sort()
return result
O(n log n), где n — общее число элементов. Игнорирует факт, что потоки уже отсортированы. На больших данных не работает — требует хранить всё в памяти, плюс пересортировка.
Heap-based merge
Правильное решение — использовать min-heap размера K. На каждом шаге берём минимум среди голов всех потоков:
import heapq
def merge_k_streams(streams):
"""streams — список итераторов отсортированных данных."""
heap = []
# инициализируем heap первым элементом каждого потока
for i, stream in enumerate(streams):
it = iter(stream)
try:
value = next(it)
heap.append((value, i, it))
except StopIteration:
pass
heapq.heapify(heap)
# пока есть данные в heap
while heap:
value, i, it = heapq.heappop(heap)
yield value
try:
next_value = next(it)
heapq.heappush(heap, (next_value, i, it))
except StopIteration:
pass
Сложность: каждое значение проходит через push + pop heap = O(log K). Всего n значений, общая сложность O(n log K). Память O(K) — храним только по одному значению из каждого потока.
Heap всегда содержит головы всех потоков. На каждом шаге берём минимум, продвигаем нужный поток.
heapq.merge — готовое решение
В Python есть встроенная функция:
import heapq
a = [1, 4, 7]
b = [2, 5, 8]
c = [3, 6, 9]
merged = list(heapq.merge(a, b, c))
print(merged) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
heapq.merge принимает несколько итераторов и возвращает генератор отсортированного потока. Работает потоково — память O(K). Поддерживает key и reverse.
С файлами:
import heapq
def file_stream(path):
with open(path) as f:
for line in f:
yield int(line.strip())
streams = [file_stream(f"sorted_chunk_{i}.txt") for i in range(100)]
with open("merged.txt", "w") as out:
for value in heapq.merge(*streams):
out.write(f"{value}\n")
Это сливает 100 отсортированных файлов любого размера за O(n log 100) времени и O(100) памяти. На 100 ГБ данных — ровно так работает external sort в production.
External sort: полный алгоритм
Полный алгоритм сортировки данных, не помещающихся в RAM:
import heapq
import os
def external_sort(input_path, output_path, chunk_size_lines=1_000_000):
# шаг 1: разделить и отсортировать чанки в RAM
chunks = []
with open(input_path) as f:
chunk = []
for line in f:
chunk.append(int(line.strip()))
if len(chunk) >= chunk_size_lines:
chunk.sort()
path = f"chunk_{len(chunks)}.tmp"
with open(path, "w") as out:
for x in chunk:
out.write(f"{x}\n")
chunks.append(path)
chunk = []
if chunk:
chunk.sort()
path = f"chunk_{len(chunks)}.tmp"
with open(path, "w") as out:
for x in chunk:
out.write(f"{x}\n")
chunks.append(path)
# шаг 2: K-way merge
streams = []
for path in chunks:
f = open(path)
streams.append((int(line.strip()) for line in f))
with open(output_path, "w") as out:
for value in heapq.merge(*streams):
out.write(f"{value}\n")
# шаг 3: cleanup
for path in chunks:
os.remove(path)
Это классика. Используется в реальных СУБД (PostgreSQL, MySQL), в Hadoop MapReduce, в Spark.
Сложность
| операция | время | память |
|---|---|---|
| разделить и сортировать чанки | O(n log m), m = chunk size | O(m) |
| K-way merge | O(n log K), K = число чанков | O(K) |
| общая | O(n log n) | O(max(m, K)) |
На 100 ГБ с чанками по 1 ГБ: 100 чанков, по 10^9 элементов в каждом. Время O(n log n) = O(10^11 * 30) операций — десятки минут CPU. Память O(10^9) на чанк + O(100) на merge — RAM 8-16 ГБ достаточно.
Spark shuffle: как это работает в реальной системе
Когда вы делаете .groupBy("key").agg(...) в Spark, происходит shuffle:
- Map phase. Каждый worker разделяет свои данные на partitions по ключу (hash partitioner).
- Shuffle write. Worker сортирует свои partition’ы по ключу и пишет на диск.
- Shuffle read. Reducer-worker подтягивает свои partition’ы со всех map-workers.
- Merge. Reducer делает K-way merge подтянутых файлов по ключу, потом агрегирует.
Это тот же алгоритм, что мы только что разобрали, просто distributed. heapq.merge -> org.apache.spark.util.collection.ExternalSorter.
Когда merge не подходит
- Потоки не отсортированы. K-way merge требует отсортированных входов. Если потоки не отсортированы, нужно сначала отсортировать (увеличивает сложность).
- Очень много потоков. При K=10000 log K = 14, но overhead на 10000 file descriptors / connections может быть критичным. Решение — двухуровневый merge: сначала merge 100 потоков по 100, потом merge 100 «промежуточных».
- Дубликаты с дедупликацией. Если нужна дедупликация при merge — нужно отдельно обрабатывать (например, при равенстве выводить только один).
Замеры
Сравним merge через heapq и через sort:
import heapq
import time
import random
# 10 отсортированных потоков по 100к элементов
streams = []
for i in range(10):
s = sorted(random.randint(0, 10**9) for _ in range(100_000))
streams.append(s)
n = sum(len(s) for s in streams)
print(f"total elements: {n}")
# через heapq.merge
t0 = time.time()
merged = list(heapq.merge(*streams))
t_heap = time.time() - t0
print(f"heapq.merge: {t_heap:.2f} s")
# через concat + sort
all_data = []
for s in streams:
all_data.extend(s)
t0 = time.time()
all_data.sort()
t_sort = time.time() - t0
print(f"concat + sort: {t_sort:.2f} s")
Типичный результат: heapq.merge ~3 секунды, concat + sort ~5 секунд. Heap merge быстрее, и память O(K) против O(n). На потоковых данных heap merge — единственный вариант.
Если вы пишете ETL, который читает данные из нескольких отсортированных источников (например, partitioned Parquet с time-based key) — heapq.merge даст вам потоковую обработку с минимальной памятью. Это самый недооценённый инструмент в стандартной библиотеке Python.
Попробуй сам
Объедините несколько отсортированных файлов логов:
import heapq
import tempfile
import os
# создадим три файла с отсортированными timestamps
files = []
for chunk in [[1, 4, 7, 10], [2, 5, 8, 11], [3, 6, 9, 12]]:
fd, path = tempfile.mkstemp(suffix=".log")
os.close(fd)
with open(path, "w") as f:
for x in chunk:
f.write(f"{x}\n")
files.append(path)
# объединяем потоково
def file_ints(path):
with open(path) as f:
for line in f:
yield int(line.strip())
streams = [file_ints(p) for p in files]
merged = list(heapq.merge(*streams))
print(merged) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
# cleanup
for p in files:
os.remove(p)
Это работает на файлах любого размера, не загружая их в память.
Merge join в PostgreSQL: K-way merge как основа соединения таблиц yield from: делегирование в генераторах Python