Learning Platform
Глоссарий Troubleshooting
Урок 03.04 · 28 мин
Продвинутый
PartitionerHashPartitionerRangePartitionerCo-partitioningShuffle elimination

Партиционеры

Партиционер — один из пяти контрактных свойств RDD, но в production-коде он влияет на производительность больше остальных четырёх вместе взятых. Правильный партиционер устраняет shuffle. Неправильный — удваивает время выполнения. Полное отсутствие партиционера там, где он должен быть, — источник большинства «внезапных» performance regression в Spark.

Что такое Partitioner

Partitioner в Spark — абстрактный класс с двумя обязательными методами:

// org.apache.spark.Partitioner
abstract class Partitioner extends Serializable {
  def numPartitions: Int            // сколько партиций в выходном RDD
  def getPartition(key: Any): Int   // ключ -> номер партиции [0, numPartitions)
}

getPartition должен быть детерминированным: один и тот же ключ всегда должен давать один и тот же номер партиции. Только тогда co-partitioning работает корректно.

Партиционер устанавливается через поле RDD.partitioner: Option[Partitioner]. None означает, что партиционирование неизвестно — data layout непредсказуем. Это сигнал для DAGScheduler: если делаем join или groupByKey, нужен shuffle.

HashPartitioner: дефолт для key-value операций

HashPartitioner(numPartitions) — партиционер по умолчанию для groupByKey, reduceByKey, aggregateByKey, join. Логика проста:

class HashPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
    // nonNegativeMod обрабатывает отрицательные hashCode:
    // Math.floorMod(key.hashCode, numPartitions) -- в Java 8+
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner => h.numPartitions == numPartitions
    case _ => false
  }
}

Сравнение партиционеров через equals — критически важно. Именно equals используется при проверке co-partitioning: два RDD считаются co-partitioned, если их партиционеры равны по equals. HashPartitioner(200).equals(HashPartitioner(200))true. HashPartitioner(200).equals(HashPartitioner(201))false, и join потребует нового shuffle.

Проблема с HashPartitioner: распределение зависит от hashCode ключей. Если данные имеют data skew (много записей с одним ключом), одна партиция будет содержать значительно больше данных остальных. Типичный пример — groupByKey по country_code с 80% данных из США.

RangePartitioner: для упорядоченных данных

RangePartitioner используется при sortByKey и sortBy. Он обеспечивает глобальную сортировку: данные в партиции i всегда «меньше» данных в партиции i+1 по порядку сортировки.

Проблема с RangePartitioner: для правильного разбиения на диапазоны нужно знать распределение данных. Нельзя просто сказать «разбей на 200 равных диапазонов» без понимания того, в каком диапазоне лежат значения.

Spark решает это через выборку (sampling):

// Упрощённый алгоритм RangePartitioner
class RangePartitioner[K: Ordering: ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    ascending: Boolean = true,
    val samplePointsPerPartitionHint: Int = 20  // Spark 4.0 default
) extends Partitioner {

  // 1. Выборка данных из rdd
  private val (rangeBounds: Array[K], formatter: Any => K) = {
    val sampleSize = math.min(samplePointsPerPartitionHint.toLong * partitions, 1e6.toLong)
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

    // sketch-sampling: каждая партиция выбирает sampleSizePerPartition элементов
    val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

    // 2. determineBounds: по выборке строим границы диапазонов
    val candidates = sketched.flatMap { case (idx, n, sample) =>
      // взвешиваем sample по реальному размеру партиции
      val weight = (n.toDouble / sample.length).toFloat
      sample.map(x => (x, weight))
    }
    RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.length))
  }

  override def getPartition(key: Any): Int = {
    // бинарный поиск по rangeBounds
    val k = formatter(key)
    var partition = 0
    if (rangeBounds.length <= 128) {
      // linear scan для малого числа диапазонов
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // бинарный поиск для большого числа
      partition = binarySearch(rangeBounds, k)
    }
    if (ascending) partition else rangeBounds.length - partition
  }
}

Ключевое: RangePartitioner при инициализации запускает Actionrdd.map(_._1).sample(false, fraction).collect(). Это означает, что sortByKey запускает два Job: первый — сбор выборки для построения границ, второй — фактическая сортировка с распределением по диапазонам. В Spark UI это будет выглядеть как два Job для одного sortByKey.

Важный параметр: samplePointsPerPartitionHint = 20 (default). Для 200 партиций Spark выберет min(20 * 200, 1_000_000) = 4000 ключей для построения границ. Если данные нестандартно распределены, это может дать плохие границы и data skew. Увеличение до 100-200 даёт точнее границы, но увеличивает overhead выборки.

RangePartitioner: sampling и границы диапазонов
Шаг 1: Samplingsketch(): каждая partition выбирает sampleSizePerPartition ключей. Взвешивание по реальному размеру partition — маленькие partitions имеют больший вес, чтобы не занижать их вклад.
Шаг 2: determineBoundsПо weighted sample строим N-1 границ для N партиций. Границы выбираются так, чтобы примерно одинаковое число (взвешенных) ключей попало в каждый диапазон.
Шаг 3: getPartitionБинарный поиск по rangeBounds. O(log(numPartitions)). Для менее 128 партиций — линейный scan (cache-friendly). Результат: номер партиции [0, N).

Custom Partitioner

Стандартные партиционеры не всегда подходят. Типичные сценарии для custom:

  • Географическое партиционирование: данные по странам, каждая страна — своя партиция.
  • Временное партиционирование: данные по дням/часам, партиция = день.
  • Domain-specific affinity: записи одного клиента должны быть в одной партиции.
# PySpark custom partitioner
from pyspark import Partitioner

class CountryPartitioner(Partitioner):
    COUNTRIES = ["US", "GB", "DE", "FR", "JP", "CN", "BR", "IN", "AU", "OTHER"]

    def __init__(self):
        super().__init__()

    def numPartitions(self):
        return len(self.COUNTRIES)

    def getPartition(self, key):
        country = key.upper() if key else "OTHER"
        if country in self.COUNTRIES:
            return self.COUNTRIES.index(country)
        return self.COUNTRIES.index("OTHER")

# Использование
orders = sc.parallelize([
    ("US", 100), ("GB", 200), ("DE", 150), ("US", 300),
    ("XX", 50),  ("JP", 400), ("CN", 600), ("BR", 80)
])

partitioned = orders.partitionBy(CountryPartitioner())
print(f"Число партиций: {partitioned.getNumPartitions()}")  # 10

# Проверим, что US всегда в одной партиции
us_partition = partitioned.partitioner.getPartition("US")
print(f"US в партиции: {us_partition}")  # 0

# Данные из одной страны в одной партиции - join без shuffle!
customer_data = orders.filter(lambda x: x[0] in ["US", "GB"]) \
                      .partitionBy(CountryPartitioner())

joined = partitioned.join(customer_data)  # Нет shuffle! Co-partitioned
print(joined.toDebugString().decode())
# Не должно быть ShuffledRDD

В Scala custom Partitioner реализует абстрактный класс org.apache.spark.Partitioner:

class TimeBasedPartitioner(val numPartitions: Int, epochSeconds: Long => Int)
    extends Partitioner {

  override def getPartition(key: Any): Int = key match {
    case ts: Long =>
      val slot = epochSeconds(ts)  // функция для маппинга timestamp -> slot
      Utils.nonNegativeMod(slot, numPartitions)
    case _ => 0
  }

  override def equals(other: Any): Boolean = other match {
    case p: TimeBasedPartitioner =>
      p.numPartitions == numPartitions && p.epochSeconds == epochSeconds
    case _ => false
  }

  override def hashCode(): Int = numPartitions  // обязателен вместе с equals!
}

Критически важно: если переопределяете equals, обязательно переопределяйте и hashCode. Иначе co-partitioning не будет работать корректно в HashMap-lookups внутри DAGScheduler.

Co-partitioning: устранение shuffle в join

Главный производственный приём с партиционерами — co-partitioning: сохранить оба входных RDD с одинаковым партиционером, и тогда join не потребует shuffle.

# Проблема: два дорогих shuffle при join
orders_by_customer = orders.groupByKey(200)     # shuffle 1: HashPartitioner(200)
customers_enriched = customers.groupByKey(200)  # shuffle 2: HashPartitioner(200)
# join увидит два RDD с HashPartitioner(200) -- они равны по equals()!
# НЕТ третьего shuffle
result = orders_by_customer.join(customers_enriched)  # Co-partitioned join!

# Но если один из них был repartitioned по-другому:
orders_100 = orders.groupByKey(100)    # HashPartitioner(100)
customers_200 = customers.groupByKey(200)  # HashPartitioner(200)
# НЕ равны -> будет shuffle при join
bad_result = orders_100.join(customers_200)  # <-- ShuffleDependency, лишний shuffle

Правило: для co-partitioned join нужно не просто «одинаковое число партиций», а одинаковый партиционер (по equals). HashPartitioner(200).equals(HashPartitioner(200)) — true. Разные типы партиционеров — никогда не равны.

Практический паттерн для production-пайплайнов с многократным join:

# Партиционируем оба датасета ОДИН РАЗ и кешируем
PARTITIONS = 400

orders_p = orders.map(lambda r: (r.customer_id, r)) \
                 .partitionBy(HashPartitioner(PARTITIONS)) \
                 .persist()  # кешируем, чтобы использовать многократно

customers_p = customers.map(lambda r: (r.id, r)) \
                       .partitionBy(HashPartitioner(PARTITIONS)) \
                       .persist()

# Все последующие join-ы будут без shuffle
result1 = orders_p.join(customers_p)                    # без shuffle
result2 = orders_p.join(customers_p).join(payments_p)  # может быть без shuffle,
# если payments_p тоже HashPartitioner(PARTITIONS)

Partition pruning

Partition pruning — оптимизация, при которой Spark не читает партиции, которые не содержат нужных данных. Это работает через метаданные, а не через сам Partitioner.

Для Hive-таблиц с partition columns (папки вида year=2024/month=01):

df = spark.read.parquet("hdfs:///data/events/")
# В Hive-метаданных зарегистрировано: партиции year=2023, year=2024, year=2025

result = df.filter(df.year == 2024)
result.explain("formatted")
# PartitionFilters: [isnotnull(year#0), (year#0 = 2024)]
# В FileScan будет: только директория year=2024/ — pruning!

Для RDD с кастомным партиционером pruning не работает автоматически — нужно вручную ограничивать partitions:

# Вместо полного scan + filter:
all_us_data = partitioned_by_country.filter(lambda kv: kv[0] == "US")
# Это создаёт MapPartitionsRDD по ВСЕМ партициям (200 Tasks)

# Правильно: обращаться только к нужной партиции
us_partition_idx = partitioned_by_country.partitioner.getPartition("US")
# В Scala: rdd.partitions(us_partition_idx) -> compute только эту партицию
# В PySpark прямой доступ к партиции через SparkContext:
us_data = sc.runJob(
    partitioned_by_country,
    lambda iter: list(iter),
    partitions=[us_partition_idx]  # только нужная партиция!
)

sc.runJob с явным указанием partitions — низкоуровневый API, который обрабатывает только указанные партиции. Для DataFrame-пайплайнов это работает через PartitionFilters в FileScan.

Co-partitioning: join без shuffle
Without co-partitioningRDD A: HashPartitioner(200). RDD B: HashPartitioner(200). Оба уже с одинаковым партиционером. join() видит equals() = true -> OneToOneDependency. Никакого ShuffleDependency.
join() checks partitionersDAGScheduler вызывает A.partitioner.equals(B.partitioner). HashPartitioner(200).equals(HashPartitioner(200)) = true. Создаётся ZippedPartitionsRDD2 с NarrowDependency.
Co-partitioned join: 0 shuffleКаждый executor обрабатывает ровно одну партицию из каждого RDD. Ключи в одной партиции гарантированно совпадают с ключами в соответствующей партиции другого RDD. Никакой сетевой передачи данных.

Выбор числа партиций

Правильное число партиций — не менее важно, чем тип партиционера. Практические правила для Spark 4.0:

Для shuffle-результатов (groupByKey, reduceByKey, join):

  • spark.sql.shuffle.partitions (default 200) для DataFrame.
  • Для RDD API передаётся явно: reduceByKey(f, numPartitions=400).
  • Ориентир: 100-200 МБ данных на партицию после shuffle.

Для чтения источников (sc.textFile, spark.read):

  • spark.default.parallelism для RDD (default = 2 * число CPU cores в кластере).
  • Для Parquet/ORC: определяется числом файлов и spark.sql.files.maxPartitionBytes (default 128 МБ).

AQE и автоматическая оптимизация: В Spark 4.0 с spark.sql.adaptive.enabled=true (default) AQE автоматически коалесцирует shuffle-партиции после выполнения shuffle. Но это работает только для DataFrame. Для RDD API AQE не применяется — нужно выбирать число партиций вручную.

Попробуй сам

Эксперимент: сравнение join с и без co-partitioning:

from pyspark import SparkContext, HashPartitioner
import time

sc = SparkContext.getOrCreate()

# Генерируем данные с распределёнными ключами
orders = sc.parallelize([(i % 1000, i) for i in range(1_000_000)], 100)
customers = sc.parallelize([(i, f"name_{i}") for i in range(1000)], 10)

# Замер 1: join без co-partitioning (будет shuffle)
start = time.time()
result1 = orders.join(customers)
count1 = result1.count()
time1 = time.time() - start
print(f"Join без co-partitioning: {time1:.2f}s, {count1} records")

# Партиционируем оба RDD с одинаковым партиционером
p = HashPartitioner(50)
orders_p = orders.partitionBy(p).persist()
customers_p = customers.partitionBy(p).persist()
orders_p.count()   # Прогрев кеша
customers_p.count()

# Замер 2: join с co-partitioning (без shuffle)
start = time.time()
result2 = orders_p.join(customers_p)  # Co-partitioned: нет ShuffleDependency
count2 = result2.count()
time2 = time.time() - start
print(f"Join с co-partitioning:   {time2:.2f}s, {count2} records")
print(f"Ускорение: {time1/time2:.1f}x")

# Проверяем: второй join не создаёт ShuffledRDD
print("\nLineage result1:")
print(result1.toDebugString().decode()[:300])
print("\nLineage result2:")
print(result2.toDebugString().decode()[:300])
# В result2 НЕ должно быть ShuffledRDD

Ожидаемый результат: второй join в 2-4x быстрее (зависит от конфигурации). В lineage result2 не будет ShuffledRDD — только ZippedPartitionsRDD2 с narrow dependency.

Проверка знанийKnowledge check
Production-задача: вы строите пайплайн, где 500GB RDD orders (ключ: customer_id) нужно join-ить с 500GB RDD transactions (ключ: customer_id) и потом join-ить результат с 1GB RDD customers (ключ: customer_id). Опишите оптимальную стратегию партиционирования для минимального числа shuffle, и объясните, почему broadcast join для customers недостаточно сам по себе.
ОтветAnswer
Оптимальная стратегия: (1) Co-partitioned join для orders и transactions: партиционировать оба с HashPartitioner(N) и persist(MEMORY_AND_DISK). Число партиций N: 500GB / 128MB = ~4000 партиций. Один shuffle на каждый из них = 2 shuffle. Потом join без третьего shuffle (co-partitioned). (2) Для customers (1GB) использовать broadcast join: spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '1073741824') (1GB). Тогда customers broadcast-ится на каждый executor, join без shuffle вообще. Итого: 2 shuffle вместо 3. Почему broadcast join для customers недостаточен САМИ ПО СЕБЕ без co-partitioning orders/transactions: даже если customers broadcast-ится, join orders.join(transactions) всё равно требует shuffle одного из них, потому что их партиционеры не совпадают. Broadcast убирает только shuffle с маленьким датасетом. Shuffle между двумя большими датасетами устраняется только co-partitioning. Практическое замечание: если persist() не умещается в память (500GB+500GB на executor-ах), использовать MEMORY_AND_DISK_SER (сжатый Kryo) или делать обе партиционирования в рамках одного Stage через partitionBy без предварительного persist — тогда они пересчитываются, но join всё равно без shuffle.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Вы выполняете df.sort('timestamp') на DataFrame с 500 партициями. Сколько Spark Jobs будет создано и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5