От O(n^2) к O(n log n)
Простые сортировки делают n^2 операций — для 10^4 элементов это 10^8 операций, единицы секунд. Для 10^6 — 10^12 операций, часы. Это неприемлемо для DE-задач, где сортировка миллионов записей — норма.
Решение — алгоритмы O(n log n). Их два «классических»:
- Merge sort — гарантированно O(n log n), стабилен, требует O(n) дополнительной памяти,
- Quicksort — average O(n log n), worst O(n^2), in-place, не стабилен.
Этот урок — про первый. Следующий — про второй.
Идея: divide and conquer
Merge sort использует принцип
- Divide: разделить массив на две половины.
- Conquer: рекурсивно отсортировать каждую половину.
- Combine: слить две отсортированные половины в одну отсортированную.
Шаг 3 — слияние (merge) — главный трюк: при наличии двух уже отсортированных половин слить их в одну отсортированную можно за O(n).
def merge_sort(arr: list[int]) -> list[int]:
if len(arr) <= 1:
return arr
mid = len(arr) // 2
left = merge_sort(arr[:mid])
right = merge_sort(arr[mid:])
return merge(left, right)
def merge(left: list[int], right: list[int]) -> list[int]:
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]: # <= ВАЖНО для стабильности
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
Запуск на [5, 2, 8, 1, 9, 3, 7, 4]:
[5, 2, 8, 1, 9, 3, 7, 4]
/ \
[5, 2, 8, 1] [9, 3, 7, 4]
/ \ / \
[5,2] [8,1] [9,3] [7,4]
⇣ ⇣ ⇣ ⇣
[2,5] [1,8] [3,9] [4,7]
↘ ↙ ↘ ↙
[1,2,5,8] [3,4,7,9]
↘ ↙
[1,2,3,4,5,7,8,9]
Каждый уровень — это слияние всех пар: всего log n уровней, на каждом O(n) работы суммарно. Итого O(n log n).
Половины делятся рекурсивно до длины 1, потом сливаются.
Слияние подробно
Самая интересная часть — функция merge. У нас два отсортированных массива. Идея:
- Два указателя:
iдля left,jдля right. - На каждом шаге берём наименьший из
left[i]иright[j], добавляем в результат, продвигаем указатель. - Когда один из массивов исчерпан, дописываем хвост другого.
def merge(left, right):
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
print(merge([1, 4, 6], [2, 3, 5]))
# [1, 2, 3, 4, 5, 6]
Каждый элемент берётся один раз. Сложность merge — O(n+m) = O(n).
Главное: <= (не <) в проверке. Если left[i] == right[j], берём из левого. Это гарантирует, что равные элементы сохраняют относительный порядок — stable sort.
Стабильность
Stable sort — это сортировка, сохраняющая относительный порядок равных элементов. То есть если два элемента равны по ключу сортировки, тот, что был раньше в исходном массиве, останется раньше в отсортированном.
Зачем это:
- Сортировка по нескольким ключам. Сначала по name, потом по age — если name одинаков, исходный порядок (по age) сохранится. Это даёт многоуровневую сортировку через последовательные вызовы.
- DataFrame-сортировки. pandas.sort_values использует Timsort (stable), что важно для воспроизводимости при сортировке по подмножеству колонок.
Merge sort стабилен. Quicksort — нет. Это иногда определяющий фактор выбора.
Подробнее это в уроке 14.02.
Сложность
Гарантия O(n log n) в любом случае. Память — O(n) дополнительной.
Время — гарантированно O(n log n). Никаких worst case. Это плюс по сравнению с quicksort.
Память — O(n). Это минус. Каждый уровень рекурсии требует временный массив для merge. Суммарно O(n) дополнительной памяти. На большом массиве может быть проблемой.
Не in-place. Существует in-place merge sort, но он сложен и обычно медленнее обычного.
Стабилен. Главное конкурентное преимущество перед quicksort.
Доказательство O(n log n) через рекуррентность
Время сортировки массива длины n:
T(n) = 2 * T(n/2) + O(n)
⎮ ⎮
⎮ ⎮ слияние двух половин — O(n)
⎮ рекурсивные вызовы на половины
Раскрываем:
- Уровень 0: 1 вызов с массивом n,
- Уровень 1: 2 вызова с массивами n/2,
- Уровень 2: 4 вызова с n/4,
- …
- Уровень k: 2^k вызовов с n/2^k.
На каждом уровне суммарная работа merge = O(n) (массивы все вместе содержат n элементов). Уровней — log_2(n). Итого O(n log n).
Это мастер-теорема в действии: T(n) = a * T(n/b) + O(n^d), здесь a=2, b=2, d=1 -> log_b(a) = 1 = d -> T(n) = O(n^d log n) = O(n log n).
In-place merge — почему не делают
Можно ли merge sort реализовать без O(n) дополнительной памяти? Существует in-place merge sort, но его константа очень высокая (медленнее обычного в 2-3 раза). На практике предпочитают:
- Стандартный merge sort (O(n) extra memory) — для больших массивов в памяти,
- Quicksort (O(log n) для рекурсии, O(1) extra) — когда память критична,
- Timsort (O(n) extra, но adaptive) — когда массив частично отсортирован.
Параллельный merge sort
Merge sort легко параллелится: левую и правую половины сортируем независимо, потом сливаем. На многоядерной машине это даёт почти линейное ускорение.
from concurrent.futures import ProcessPoolExecutor
def merge_sort_parallel(arr, level=0, max_level=2):
if len(arr) <= 1:
return arr
if level >= max_level:
return merge_sort(arr)
mid = len(arr) // 2
with ProcessPoolExecutor(max_workers=2) as ex:
left_fut = ex.submit(merge_sort_parallel, arr[:mid], level+1, max_level)
right_fut = ex.submit(merge_sort_parallel, arr[mid:], level+1, max_level)
left = left_fut.result()
right = right_fut.result()
return merge(left, right)
max_level ограничивает глубину параллелизма (создание процессов дорогое; ниже определённого размера лучше переключиться на последовательный). Подробно в 14.04.
External merge sort: сортировка файлов
Файл 100 ГБ не помещается в RAM 16 ГБ. Стандарт сортировки — external merge sort:
- Читаем файл блоками по 1-2 ГБ.
- Каждый блок сортируем в памяти (любым алгоритмом), пишем на диск как «run».
- Открываем несколько runs одновременно, сливаем
merge-функцией, пишем в выходной файл. - Если runs больше чем умеет одно слияние — несколько проходов.
def external_sort(input_path: str, output_path: str, chunk_size: int = 10_000_000):
"""Сортирует огромный файл целых чисел через external merge sort."""
import heapq
import tempfile
import os
# 1) разбить на отсортированные runs
chunk = []
runs = []
with open(input_path) as f:
for line in f:
chunk.append(int(line))
if len(chunk) >= chunk_size:
chunk.sort()
tf = tempfile.NamedTemporaryFile(delete=False, mode='w')
for x in chunk:
tf.write(f"{x}\n")
tf.close()
runs.append(tf.name)
chunk = []
if chunk:
chunk.sort()
tf = tempfile.NamedTemporaryFile(delete=False, mode='w')
for x in chunk:
tf.write(f"{x}\n")
tf.close()
runs.append(tf.name)
# 2) k-way merge через heapq.merge
files = [open(r) for r in runs]
streams = [(int(line) for line in f) for f in files]
with open(output_path, 'w') as out:
for x in heapq.merge(*streams):
out.write(f"{x}\n")
for f in files: f.close()
for r in runs: os.unlink(r)
heapq.merge — встроенная функция, сливающая несколько отсортированных streams. Это и есть merge с k>2.
External merge sort — основа ORDER BY в Postgres (когда work_mem не хватает) и в Spark.
Merge join в SQL
Один из главных join-алгоритмов баз данных. Идея:
- Сортируем обе таблицы по ключу join.
- Идём по обеим линейно с двумя указателями, как в merge.
Без сортировок merge join — это O(n + m). С сортировкой — O(n log n + m log m + n + m). На больших таблицах это часто быстрее hash join (особенно когда нужен ORDER BY на выходе или когда обе таблицы уже отсортированы по ключу — например, индексированы).
DE-кейс: сортировка ETL-выгрузки
Сценарий: у вас выгрузка 100М событий из API, нужно отсортировать по timestamp перед записью в Parquet с правильным row group order.
Решение:
- Если помещается в RAM:
sorted(events, key=lambda e: e["ts"])— Timsort O(n log n), несколько минут на 100М. - Если не помещается: external merge sort. Pandas + chunks:
pd.read_csv(..., chunksize=1_000_000), сортировать каждый chunk, писать как Parquet, потом сливать черезpyarrow.compute.sort_indicesилиpolars.
Polars/DuckDB делают external sort прозрачно: вы пишете .sort() или ORDER BY, движок сам решает, помещается ли в память.
Попробуй сам
import random
import time
def merge_sort(arr):
if len(arr) <= 1:
return arr
mid = len(arr) // 2
return merge(merge_sort(arr[:mid]), merge_sort(arr[mid:]))
def merge(left, right):
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i]); i += 1
else:
result.append(right[j]); j += 1
result.extend(left[i:]); result.extend(right[j:])
return result
random.seed(42)
arr = [random.randint(0, 10**9) for _ in range(100_000)]
t = time.perf_counter(); merge_sort(arr[:]); print("merge:", time.perf_counter() - t)
t = time.perf_counter(); sorted(arr[:]); print("Timsort:", time.perf_counter() - t)
Ожидаемое:
- merge: 0.4-0.8 с,
- Timsort (через
sorted): 0.04-0.08 с — в 10 раз быстрее.
Причины:
- Timsort на C, наш merge на Python — основная разница.
- Timsort adaptive — на частично-отсортированных данных он быстрее.
- Cache — у Timsort лучше из-за гибридной структуры.
В следующем уроке — quicksort, второй кит O(n log n)-сортировок.
Merge Join в PostgreSQL: sort + zip yield from для рекурсивных генераторов — merge sort как generator