Learning Platform
Глоссарий Troubleshooting
Урок 16.04 · 28 мин
Начальный
parallel sortmultiprocessingexternal sortmerge sortGIL

Зачем параллелить sort

На современных машинах 8-32 ядра — стандарт. Но обычный sorted() в Python использует одно ядро (из-за GIL и last-mile optimisation). Это значит — даже самые быстрые сортировки не масштабируются с числом ядер.

В DE-задачах это часто узкое место. Спарковский shuffle, postgres external sort, hadoop mapreduce — все используют параллельную сортировку, иначе на больших данных просто бы не закончили работу.

Этот урок — про идеи параллельной сортировки и про external sort (когда данные не помещаются в RAM).

Параллельный merge sort

Merge sort идеален для параллелизма. Идея:

  1. Разделить массив на k частей (k = число ядер).
  2. На каждом ядре сортируется одна часть (любым sequential алгоритмом).
  3. Слить k отсортированных частей в одну через k-way merge.
[массив n элементов]
       v
  ┌────┴────┬────┬────┐
  v         v    v    v
 ядро 1   ядро 2 ядро 3 ядро 4
 sort     sort   sort   sort
  v         v    v    v
[run1]   [run2] [run3] [run4]
        v v v v
       k-way merge
          v
  [отсортировано]

Сложность:

  • Sequential sort: O(n log n) (sequential).
  • Parallel sort 1 (par_sort на k частей): O((n/k) log(n/k)) на каждом ядре — в k раз быстрее, если ядер хватает.
  • Merge: O(n) sequential, либо O(n log k) с heap для k-way.

Общий speedup — до k-1 раз в идеале. На практике мешают:

  • Memory bandwidth — ядра конкурируют за RAM.
  • Cache pollution — соседние ядра выгоняют друг друга из cache.
  • Coordination overhead.

В Python из-за GIL threading не даёт реального параллелизма для CPU-bound. Используют multiprocessing — но создание процессов и передача данных через pickle дороги. Эффективно работает только на больших массивах (>10М).

from multiprocessing import Pool
import numpy as np

def sort_chunk(chunk):
    return sorted(chunk)

def parallel_sort(arr, n_workers=4):
    chunk_size = len(arr) // n_workers
    chunks = [arr[i:i+chunk_size] for i in range(0, len(arr), chunk_size)]
    with Pool(n_workers) as pool:
        sorted_chunks = pool.map(sort_chunk, chunks)
    # k-way merge через heapq.merge
    import heapq
    return list(heapq.merge(*sorted_chunks))

import random
random.seed(0)
arr = [random.randint(0, 10**9) for _ in range(10**7)]

# для сравнения — sequential
import time
t = time.perf_counter()
sorted(arr)
print("sequential:", time.perf_counter() - t)

t = time.perf_counter()
parallel_sort(arr, n_workers=4)
print("parallel:", time.perf_counter() - t)

Типичный результат на 4-core x86:

  • sequential: 5 секунд,
  • parallel: 2-3 секунды.

Speedup 2-2.5x, не 4x — мешают накладные расходы Python multiprocessing.

В numpy/C это бы работало эффективнее, но numpy.sort() сам по себе single-threaded.

Параллельный quicksort: труднее

Quicksort тоже можно параллелить — partition разделяет массив на две части, их сортируем параллельно. Но:

  1. Partition не параллелится легко — это последовательный проход.
  2. Дисбаланс — если pivot плохой, одна часть намного больше другой. Тогда одно ядро занято, второе простаивает.
  3. Глубина рекурсии — на первых уровнях рекурсии можно параллелить, на нижних — bundle становится мелким, overhead превышает выгоду.

Производственные libs делают threshold-based параллелизм: разделяем параллельно пока размер >= 10^5, ниже — sequentially. Это balance между параллелизмом и overhead.

В Java Arrays.parallelSort() использует именно такую схему: параллельный quicksort с threshold, fallback на merge sort.

External sort: данные больше RAM

Если массив 200 ГБ, а RAM 64 ГБ — sort в памяти невозможен. Используется external sort:

  1. Chunk и sort. Читать файл блоками по chunk_size (например, 10 ГБ). Каждый блок сортировать в памяти (быстро, любым sort’ом). Записать как «run» во временный файл.
  2. Merge runs. Открыть все run-файлы, использовать k-way merge через priority queue. Читать по одной записи из каждого, выдавать минимум.
import heapq
import tempfile
import os

def external_sort(input_path: str, output_path: str, chunk_size: int = 10_000_000):
    """Сортирует огромный файл int'ов внешней сортировкой."""
    # 1) разбить и отсортировать
    chunks = []
    chunk = []
    with open(input_path) as f:
        for line in f:
            chunk.append(int(line))
            if len(chunk) >= chunk_size:
                chunk.sort()
                tf = tempfile.NamedTemporaryFile(mode="w", delete=False)
                tf.writelines(f"{x}\n" for x in chunk)
                tf.close()
                chunks.append(tf.name)
                chunk = []
        if chunk:
            chunk.sort()
            tf = tempfile.NamedTemporaryFile(mode="w", delete=False)
            tf.writelines(f"{x}\n" for x in chunk)
            tf.close()
            chunks.append(tf.name)

    # 2) k-way merge через heapq.merge
    files = [open(c) for c in chunks]
    streams = [(int(line) for line in f) for f in files]
    with open(output_path, "w") as out:
        for x in heapq.merge(*streams):
            out.write(f"{x}\n")

    for f in files:
        f.close()
    for c in chunks:
        os.unlink(c)

Сложность external sort:

  • Sort: O(n log chunk_size) на каждый chunk, всего O(n log chunk_size).
  • Merge: O(n log k), где k — число chunks.

Total O(n log chunk_size + n log k) = O(n log n) (при разумных chunk_size).

Memory peak: один chunk + один элемент из каждого stream = O(chunk_size + k). Можно работать с любыми n при фиксированной RAM.

External sort в Postgres и Spark

Postgres ORDER BY на большом наборе: если результат не помещается в work_mem, используется external sort. Postgres пишет run’ы в pgsql_tmp, потом merge’ит.

Spark sortBy / orderBy: разбивает данные на партиции, сортирует каждую в памяти, потом shuffle-and-merge на reducer-нодах. Это эффективно distributed external sort.

Hadoop: аналогично — map-side sort, shuffle, reduce-side merge.

Без external sort не работают ни OLAP-системы, ни любая обработка больших данных.

K-way merge: heap всему голова

В k-way merge самая важная структура — min-heap на k stream’ов. Каждый stream — это итератор по отсортированному файлу.

import heapq

def k_way_merge(streams):
    """streams — список iterator'ов по отсортированным последовательностям."""
    pq = []
    # инициализация: первый элемент каждого stream'а в heap
    for i, stream in enumerate(streams):
        try:
            val = next(stream)
            heapq.heappush(pq, (val, i))
        except StopIteration:
            pass
    # пока heap не пуст — выдаём минимум и продвигаем stream
    while pq:
        val, i = heapq.heappop(pq)
        yield val
        try:
            next_val = next(streams[i])
            heapq.heappush(pq, (next_val, i))
        except StopIteration:
            pass

Сложность: каждая запись проходит через heap дважды (push + pop), каждая операция O(log k). Total O(n log k).

heapq.merge в Python — это, по сути, тот же алгоритм:

import heapq
list(heapq.merge([1, 4, 7], [2, 3, 8], [0, 5, 6]))
# [0, 1, 2, 3, 4, 5, 6, 7, 8]

Эффективное, lazy, потоковое — не загружает все streams в память сразу.

GIL и реальный параллелизм в Python

Главное препятствие для параллельного sort в Python —

GIL
. CPU-bound операции (включая обычный sort) на нескольких threads не ускоряются.

Решения:

  1. multiprocessing — несколько процессов, каждый со своим GIL. Минус — pickle/unpickle при передаче данных.
  2. numpy C-extensions — numpy sort внутри C, освобождает GIL во время работы. Можно запускать parallel через ThreadPoolExecutor.
  3. PyPy / Cython — компиляция Python-кода в нативный, GIL обходится.

В Python 3.13 появилась опция отключить GIL (PEP 703). Это в будущем сделает threading-based parallel sort реальным. Пока — multiprocessing.

DE-кейс: distributed sort в Spark

Spark df.sort("ts") под капотом:

  1. Shuffle write: на map-стороне данные сортируются локально (Timsort/quicksort), бьются на партиции, записываются на диск.
  2. Shuffle read: на reduce-стороне партиции читаются из всех map’ов и сливаются (k-way merge с heap).
  3. Final sort: если нужен total order, добавляется глобальная сортировка партиций.

Это классическое external + parallel sort, distributed across cluster. Понимание этих принципов помогает читать Spark UI и оптимизировать pipeline.

Попробуй сам

from multiprocessing import Pool
import heapq
import random
import time

random.seed(42)
N = 5 * 10**6
arr = [random.randint(0, 10**9) for _ in range(N)]

# sequential
t = time.perf_counter()
sorted(arr)
print(f"sequential: {time.perf_counter() - t:.3f}s")

# parallel с 4 workers
def sort_chunk(chunk):
    return sorted(chunk)

n_workers = 4
chunk_size = N // n_workers
chunks = [arr[i*chunk_size:(i+1)*chunk_size] for i in range(n_workers)]

t = time.perf_counter()
with Pool(n_workers) as pool:
    sorted_chunks = pool.map(sort_chunk, chunks)
result = list(heapq.merge(*sorted_chunks))
print(f"parallel ({n_workers}): {time.perf_counter() - t:.3f}s")

Ожидаемое (4-core x86):

  • sequential: 2-3 с,
  • parallel: 1-1.5 с (speedup ~2x; не 4x из-за multiprocessing overhead и memory bandwidth).

Speedup лучше на больших массивах — overhead amortизируется.

В следующем уроке — benchmarking sorts и реальные числа на разных формах данных. Все теоретические утверждения мы подтвердим измерениями.

GIL, threading vs multiprocessing Потоки vs процессы: что общего, что разного
Проверка знанийKnowledge check
Нужно отсортировать файл размером 500 ГБ на машине с 32 ГБ RAM. Опишите external sort pipeline и сколько примерно времени он займёт.
ОтветAnswer
External sort в два прохода. (1) Chunk-and-sort: читаем файл блоками по 20-25 ГБ, сортируем в RAM любым in-place алгоритмом (numpy quicksort или Timsort если позволяет память; реально лучше 10-15 ГБ для запаса), пишем как отсортированные runs во временные файлы — получаем ~25-50 runs. Время на этом шаге — ~5-10 минут (1-2 ГБ/с чтение + 500 МБ/с sort на CPU + 1 ГБ/с запись). (2) K-way merge: открываем все runs одновременно, используем min-heap (heapq.merge) для слияния streams, пишем в выходной файл. Время — ~10-15 минут (читаем 500 ГБ потоково, merge с overhead O(log k) на запись). Total ~20-25 минут при единственном CPU; с параллелизмом первый шаг (sort chunks на разных ядрах) ускоряется в 4-8 раз. Это база ORDER BY в Postgres external sort и Spark shuffle.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. Почему merge sort легче параллелится, чем quicksort?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5