Зачем параллелить sort
На современных машинах 8-32 ядра — стандарт. Но обычный sorted() в Python использует одно ядро (из-за GIL и last-mile optimisation). Это значит — даже самые быстрые сортировки не масштабируются с числом ядер.
В DE-задачах это часто узкое место. Спарковский shuffle, postgres external sort, hadoop mapreduce — все используют параллельную сортировку, иначе на больших данных просто бы не закончили работу.
Этот урок — про идеи параллельной сортировки и про external sort (когда данные не помещаются в RAM).
Параллельный merge sort
Merge sort идеален для параллелизма. Идея:
- Разделить массив на k частей (k = число ядер).
- На каждом ядре сортируется одна часть (любым sequential алгоритмом).
- Слить k отсортированных частей в одну через k-way merge.
[массив n элементов]
v
┌────┴────┬────┬────┐
v v v v
ядро 1 ядро 2 ядро 3 ядро 4
sort sort sort sort
v v v v
[run1] [run2] [run3] [run4]
v v v v
k-way merge
v
[отсортировано]
Сложность:
- Sequential sort: O(n log n) (sequential).
- Parallel sort 1 (par_sort на k частей): O((n/k) log(n/k)) на каждом ядре — в k раз быстрее, если ядер хватает.
- Merge: O(n) sequential, либо O(n log k) с heap для k-way.
Общий speedup — до k-1 раз в идеале. На практике мешают:
- Memory bandwidth — ядра конкурируют за RAM.
- Cache pollution — соседние ядра выгоняют друг друга из cache.
- Coordination overhead.
В Python из-за GIL threading не даёт реального параллелизма для CPU-bound. Используют multiprocessing — но создание процессов и передача данных через pickle дороги. Эффективно работает только на больших массивах (>10М).
from multiprocessing import Pool
import numpy as np
def sort_chunk(chunk):
return sorted(chunk)
def parallel_sort(arr, n_workers=4):
chunk_size = len(arr) // n_workers
chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
with Pool(n_workers) as pool:
sorted_chunks = pool.map(sort_chunk, chunks)
# k-way merge через heapq.merge
import heapq
return list(heapq.merge(*sorted_chunks))
import random
random.seed(0)
arr = [random.randint(0, 10**9) for _ in range(10**7)]
# для сравнения — sequential
import time
t = time.perf_counter()
sorted(arr)
print("sequential:", time.perf_counter() - t)
t = time.perf_counter()
parallel_sort(arr, n_workers=4)
print("parallel:", time.perf_counter() - t)
Типичный результат на 4-core x86:
- sequential: 5 секунд,
- parallel: 2-3 секунды.
Speedup 2-2.5x, не 4x — мешают накладные расходы Python multiprocessing.
В numpy/C это бы работало эффективнее, но numpy.sort() сам по себе single-threaded.
Параллельный quicksort: труднее
Quicksort тоже можно параллелить — partition разделяет массив на две части, их сортируем параллельно. Но:
- Partition не параллелится легко — это последовательный проход.
- Дисбаланс — если pivot плохой, одна часть намного больше другой. Тогда одно ядро занято, второе простаивает.
- Глубина рекурсии — на первых уровнях рекурсии можно параллелить, на нижних — bundle становится мелким, overhead превышает выгоду.
Производственные libs делают threshold-based параллелизм: разделяем параллельно пока размер >= 10^5, ниже — sequentially. Это balance между параллелизмом и overhead.
В Java Arrays.parallelSort() использует именно такую схему: параллельный quicksort с threshold, fallback на merge sort.
External sort: данные больше RAM
Если массив 200 ГБ, а RAM 64 ГБ — sort в памяти невозможен. Используется external sort:
- Chunk и sort. Читать файл блоками по chunk_size (например, 10 ГБ). Каждый блок сортировать в памяти (быстро, любым sort’ом). Записать как «run» во временный файл.
- Merge runs. Открыть все run-файлы, использовать k-way merge через priority queue. Читать по одной записи из каждого, выдавать минимум.
import heapq
import tempfile
import os
def external_sort(input_path: str, output_path: str, chunk_size: int = 10_000_000):
"""Сортирует огромный файл int'ов внешней сортировкой."""
# 1) разбить и отсортировать
chunks = []
chunk = []
with open(input_path) as f:
for line in f:
chunk.append(int(line))
if len(chunk) >= chunk_size:
chunk.sort()
tf = tempfile.NamedTemporaryFile(mode="w", delete=False)
tf.writelines(f"{x}\n" for x in chunk)
tf.close()
chunks.append(tf.name)
chunk = []
if chunk:
chunk.sort()
tf = tempfile.NamedTemporaryFile(mode="w", delete=False)
tf.writelines(f"{x}\n" for x in chunk)
tf.close()
chunks.append(tf.name)
# 2) k-way merge через heapq.merge
files = [open(c) for c in chunks]
streams = [(int(line) for line in f) for f in files]
with open(output_path, "w") as out:
for x in heapq.merge(*streams):
out.write(f"{x}\n")
for f in files:
f.close()
for c in chunks:
os.unlink(c)
Сложность external sort:
- Sort: O(n log chunk_size) на каждый chunk, всего O(n log chunk_size).
- Merge: O(n log k), где k — число chunks.
Total O(n log chunk_size + n log k) = O(n log n) (при разумных chunk_size).
Memory peak: один chunk + один элемент из каждого stream = O(chunk_size + k). Можно работать с любыми n при фиксированной RAM.
External sort в Postgres и Spark
Postgres ORDER BY на большом наборе: если результат не помещается в work_mem, используется external sort. Postgres пишет run’ы в pgsql_tmp, потом merge’ит.
Spark sortBy / orderBy: разбивает данные на партиции, сортирует каждую в памяти, потом shuffle-and-merge на reducer-нодах. Это эффективно distributed external sort.
Hadoop: аналогично — map-side sort, shuffle, reduce-side merge.
Без external sort не работают ни OLAP-системы, ни любая обработка больших данных.
K-way merge: heap всему голова
В k-way merge самая важная структура — min-heap на k stream’ов. Каждый stream — это итератор по отсортированному файлу.
import heapq
def k_way_merge(streams):
"""streams — список iterator'ов по отсортированным последовательностям."""
pq = []
# инициализация: первый элемент каждого stream'а в heap
for i, stream in enumerate(streams):
try:
val = next(stream)
heapq.heappush(pq, (val, i))
except StopIteration:
pass
# пока heap не пуст — выдаём минимум и продвигаем stream
while pq:
val, i = heapq.heappop(pq)
yield val
try:
next_val = next(streams[i])
heapq.heappush(pq, (next_val, i))
except StopIteration:
pass
Сложность: каждая запись проходит через heap дважды (push + pop), каждая операция O(log k). Total O(n log k).
heapq.merge в Python — это, по сути, тот же алгоритм:
import heapq
list(heapq.merge([1, 4, 7], [2, 3, 8], [0, 5, 6]))
# [0, 1, 2, 3, 4, 5, 6, 7, 8]
Эффективное, lazy, потоковое — не загружает все streams в память сразу.
GIL и реальный параллелизм в Python
Главное препятствие для параллельного sort в Python —
Решения:
- multiprocessing — несколько процессов, каждый со своим GIL. Минус — pickle/unpickle при передаче данных.
- numpy C-extensions — numpy sort внутри C, освобождает GIL во время работы. Можно запускать parallel через ThreadPoolExecutor.
- PyPy / Cython — компиляция Python-кода в нативный, GIL обходится.
В Python 3.13 появилась опция отключить GIL (PEP 703). Это в будущем сделает threading-based parallel sort реальным. Пока — multiprocessing.
DE-кейс: distributed sort в Spark
Spark df.sort("ts") под капотом:
- Shuffle write: на map-стороне данные сортируются локально (Timsort/quicksort), бьются на партиции, записываются на диск.
- Shuffle read: на reduce-стороне партиции читаются из всех map’ов и сливаются (k-way merge с heap).
- Final sort: если нужен total order, добавляется глобальная сортировка партиций.
Это классическое external + parallel sort, distributed across cluster. Понимание этих принципов помогает читать Spark UI и оптимизировать pipeline.
Попробуй сам
from multiprocessing import Pool
import heapq
import random
import time
random.seed(42)
N = 5 * 10**6
arr = [random.randint(0, 10**9) for _ in range(N)]
# sequential
t = time.perf_counter()
sorted(arr)
print(f"sequential: {time.perf_counter() - t:.3f}s")
# parallel с 4 workers
def sort_chunk(chunk):
return sorted(chunk)
n_workers = 4
chunk_size = N // n_workers
chunks = [arr[i*chunk_size:(i+1)*chunk_size] for i in range(n_workers)]
t = time.perf_counter()
with Pool(n_workers) as pool:
sorted_chunks = pool.map(sort_chunk, chunks)
result = list(heapq.merge(*sorted_chunks))
print(f"parallel ({n_workers}): {time.perf_counter() - t:.3f}s")
Ожидаемое (4-core x86):
- sequential: 2-3 с,
- parallel: 1-1.5 с (speedup ~2x; не 4x из-за multiprocessing overhead и memory bandwidth).
Speedup лучше на больших массивах — overhead amortизируется.
В следующем уроке — benchmarking sorts и реальные числа на разных формах данных. Все теоретические утверждения мы подтвердим измерениями.
GIL, threading vs multiprocessing Потоки vs процессы: что общего, что разного