Партиционеры
Партиционер — один из пяти контрактных свойств RDD, но в production-коде он влияет на производительность больше остальных четырёх вместе взятых. Правильный партиционер устраняет shuffle. Неправильный — удваивает время выполнения. Полное отсутствие партиционера там, где он должен быть, — источник большинства «внезапных» performance regression в Spark.
Что такое Partitioner
Partitioner в Spark — абстрактный класс с двумя обязательными методами:
// org.apache.spark.Partitioner
abstract class Partitioner extends Serializable {
def numPartitions: Int // сколько партиций в выходном RDD
def getPartition(key: Any): Int // ключ -> номер партиции [0, numPartitions)
}
getPartition должен быть детерминированным: один и тот же ключ всегда должен давать один и тот же номер партиции. Только тогда co-partitioning работает корректно.
Партиционер устанавливается через поле RDD.partitioner: Option[Partitioner]. None означает, что партиционирование неизвестно — data layout непредсказуем. Это сигнал для DAGScheduler: если делаем join или groupByKey, нужен shuffle.
HashPartitioner: дефолт для key-value операций
HashPartitioner(numPartitions) — партиционер по умолчанию для groupByKey, reduceByKey, aggregateByKey, join. Логика проста:
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
// nonNegativeMod обрабатывает отрицательные hashCode:
// Math.floorMod(key.hashCode, numPartitions) -- в Java 8+
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner => h.numPartitions == numPartitions
case _ => false
}
}
Сравнение партиционеров через equals — критически важно. Именно equals используется при проверке co-partitioning: два RDD считаются co-partitioned, если их партиционеры равны по equals. HashPartitioner(200).equals(HashPartitioner(200)) — true. HashPartitioner(200).equals(HashPartitioner(201)) — false, и join потребует нового shuffle.
Проблема с HashPartitioner: распределение зависит от hashCode ключей. Если данные имеют data skew (много записей с одним ключом), одна партиция будет содержать значительно больше данных остальных. Типичный пример — groupByKey по country_code с 80% данных из США.
RangePartitioner: для упорядоченных данных
RangePartitioner используется при sortByKey и sortBy. Он обеспечивает глобальную сортировку: данные в партиции i всегда «меньше» данных в партиции i+1 по порядку сортировки.
Проблема с RangePartitioner: для правильного разбиения на диапазоны нужно знать распределение данных. Нельзя просто сказать «разбей на 200 равных диапазонов» без понимания того, в каком диапазоне лежат значения.
Spark решает это через выборку (sampling):
// Упрощённый алгоритм RangePartitioner
class RangePartitioner[K: Ordering: ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20 // Spark 4.0 default
) extends Partitioner {
// 1. Выборка данных из rdd
private val (rangeBounds: Array[K], formatter: Any => K) = {
val sampleSize = math.min(samplePointsPerPartitionHint.toLong * partitions, 1e6.toLong)
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// sketch-sampling: каждая партиция выбирает sampleSizePerPartition элементов
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
// 2. determineBounds: по выборке строим границы диапазонов
val candidates = sketched.flatMap { case (idx, n, sample) =>
// взвешиваем sample по реальному размеру партиции
val weight = (n.toDouble / sample.length).toFloat
sample.map(x => (x, weight))
}
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.length))
}
override def getPartition(key: Any): Int = {
// бинарный поиск по rangeBounds
val k = formatter(key)
var partition = 0
if (rangeBounds.length <= 128) {
// linear scan для малого числа диапазонов
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// бинарный поиск для большого числа
partition = binarySearch(rangeBounds, k)
}
if (ascending) partition else rangeBounds.length - partition
}
}
Ключевое: RangePartitioner при инициализации запускает Action — rdd.map(_._1).sample(false, fraction).collect(). Это означает, что sortByKey запускает два Job: первый — сбор выборки для построения границ, второй — фактическая сортировка с распределением по диапазонам. В Spark UI это будет выглядеть как два Job для одного sortByKey.
Важный параметр: samplePointsPerPartitionHint = 20 (default). Для 200 партиций Spark выберет min(20 * 200, 1_000_000) = 4000 ключей для построения границ. Если данные нестандартно распределены, это может дать плохие границы и data skew. Увеличение до 100-200 даёт точнее границы, но увеличивает overhead выборки.
Custom Partitioner
Стандартные партиционеры не всегда подходят. Типичные сценарии для custom:
- Географическое партиционирование: данные по странам, каждая страна — своя партиция.
- Временное партиционирование: данные по дням/часам, партиция = день.
- Domain-specific affinity: записи одного клиента должны быть в одной партиции.
# PySpark custom partitioner
from pyspark import Partitioner
class CountryPartitioner(Partitioner):
COUNTRIES = ["US", "GB", "DE", "FR", "JP", "CN", "BR", "IN", "AU", "OTHER"]
def __init__(self):
super().__init__()
def numPartitions(self):
return len(self.COUNTRIES)
def getPartition(self, key):
country = key.upper() if key else "OTHER"
if country in self.COUNTRIES:
return self.COUNTRIES.index(country)
return self.COUNTRIES.index("OTHER")
# Использование
orders = sc.parallelize([
("US", 100), ("GB", 200), ("DE", 150), ("US", 300),
("XX", 50), ("JP", 400), ("CN", 600), ("BR", 80)
])
partitioned = orders.partitionBy(CountryPartitioner())
print(f"Число партиций: {partitioned.getNumPartitions()}") # 10
# Проверим, что US всегда в одной партиции
us_partition = partitioned.partitioner.getPartition("US")
print(f"US в партиции: {us_partition}") # 0
# Данные из одной страны в одной партиции - join без shuffle!
customer_data = orders.filter(lambda x: x[0] in ["US", "GB"]) \
.partitionBy(CountryPartitioner())
joined = partitioned.join(customer_data) # Нет shuffle! Co-partitioned
print(joined.toDebugString().decode())
# Не должно быть ShuffledRDD
В Scala custom Partitioner реализует абстрактный класс org.apache.spark.Partitioner:
class TimeBasedPartitioner(val numPartitions: Int, epochSeconds: Long => Int)
extends Partitioner {
override def getPartition(key: Any): Int = key match {
case ts: Long =>
val slot = epochSeconds(ts) // функция для маппинга timestamp -> slot
Utils.nonNegativeMod(slot, numPartitions)
case _ => 0
}
override def equals(other: Any): Boolean = other match {
case p: TimeBasedPartitioner =>
p.numPartitions == numPartitions && p.epochSeconds == epochSeconds
case _ => false
}
override def hashCode(): Int = numPartitions // обязателен вместе с equals!
}
Критически важно: если переопределяете equals, обязательно переопределяйте и hashCode. Иначе co-partitioning не будет работать корректно в HashMap-lookups внутри DAGScheduler.
Co-partitioning: устранение shuffle в join
Главный производственный приём с партиционерами — co-partitioning: сохранить оба входных RDD с одинаковым партиционером, и тогда join не потребует shuffle.
# Проблема: два дорогих shuffle при join
orders_by_customer = orders.groupByKey(200) # shuffle 1: HashPartitioner(200)
customers_enriched = customers.groupByKey(200) # shuffle 2: HashPartitioner(200)
# join увидит два RDD с HashPartitioner(200) -- они равны по equals()!
# НЕТ третьего shuffle
result = orders_by_customer.join(customers_enriched) # Co-partitioned join!
# Но если один из них был repartitioned по-другому:
orders_100 = orders.groupByKey(100) # HashPartitioner(100)
customers_200 = customers.groupByKey(200) # HashPartitioner(200)
# НЕ равны -> будет shuffle при join
bad_result = orders_100.join(customers_200) # <-- ShuffleDependency, лишний shuffle
Правило: для co-partitioned join нужно не просто «одинаковое число партиций», а одинаковый партиционер (по equals). HashPartitioner(200).equals(HashPartitioner(200)) — true. Разные типы партиционеров — никогда не равны.
Практический паттерн для production-пайплайнов с многократным join:
# Партиционируем оба датасета ОДИН РАЗ и кешируем
PARTITIONS = 400
orders_p = orders.map(lambda r: (r.customer_id, r)) \
.partitionBy(HashPartitioner(PARTITIONS)) \
.persist() # кешируем, чтобы использовать многократно
customers_p = customers.map(lambda r: (r.id, r)) \
.partitionBy(HashPartitioner(PARTITIONS)) \
.persist()
# Все последующие join-ы будут без shuffle
result1 = orders_p.join(customers_p) # без shuffle
result2 = orders_p.join(customers_p).join(payments_p) # может быть без shuffle,
# если payments_p тоже HashPartitioner(PARTITIONS)
Partition pruning
Partition pruning — оптимизация, при которой Spark не читает партиции, которые не содержат нужных данных. Это работает через метаданные, а не через сам Partitioner.
Для Hive-таблиц с partition columns (папки вида year=2024/month=01):
df = spark.read.parquet("hdfs:///data/events/")
# В Hive-метаданных зарегистрировано: партиции year=2023, year=2024, year=2025
result = df.filter(df.year == 2024)
result.explain("formatted")
# PartitionFilters: [isnotnull(year#0), (year#0 = 2024)]
# В FileScan будет: только директория year=2024/ — pruning!
Для RDD с кастомным партиционером pruning не работает автоматически — нужно вручную ограничивать partitions:
# Вместо полного scan + filter:
all_us_data = partitioned_by_country.filter(lambda kv: kv[0] == "US")
# Это создаёт MapPartitionsRDD по ВСЕМ партициям (200 Tasks)
# Правильно: обращаться только к нужной партиции
us_partition_idx = partitioned_by_country.partitioner.getPartition("US")
# В Scala: rdd.partitions(us_partition_idx) -> compute только эту партицию
# В PySpark прямой доступ к партиции через SparkContext:
us_data = sc.runJob(
partitioned_by_country,
lambda iter: list(iter),
partitions=[us_partition_idx] # только нужная партиция!
)
sc.runJob с явным указанием partitions — низкоуровневый API, который обрабатывает только указанные партиции. Для DataFrame-пайплайнов это работает через PartitionFilters в FileScan.
Выбор числа партиций
Правильное число партиций — не менее важно, чем тип партиционера. Практические правила для Spark 4.0:
Для shuffle-результатов (groupByKey, reduceByKey, join):
spark.sql.shuffle.partitions(default 200) для DataFrame.- Для RDD API передаётся явно:
reduceByKey(f, numPartitions=400). - Ориентир: 100-200 МБ данных на партицию после shuffle.
Для чтения источников (sc.textFile, spark.read):
spark.default.parallelismдля RDD (default = 2 * число CPU cores в кластере).- Для Parquet/ORC: определяется числом файлов и
spark.sql.files.maxPartitionBytes(default 128 МБ).
AQE и автоматическая оптимизация:
В Spark 4.0 с spark.sql.adaptive.enabled=true (default) AQE автоматически коалесцирует shuffle-партиции после выполнения shuffle. Но это работает только для DataFrame. Для RDD API AQE не применяется — нужно выбирать число партиций вручную.
Попробуй сам
Эксперимент: сравнение join с и без co-partitioning:
from pyspark import SparkContext, HashPartitioner
import time
sc = SparkContext.getOrCreate()
# Генерируем данные с распределёнными ключами
orders = sc.parallelize([(i % 1000, i) for i in range(1_000_000)], 100)
customers = sc.parallelize([(i, f"name_{i}") for i in range(1000)], 10)
# Замер 1: join без co-partitioning (будет shuffle)
start = time.time()
result1 = orders.join(customers)
count1 = result1.count()
time1 = time.time() - start
print(f"Join без co-partitioning: {time1:.2f}s, {count1} records")
# Партиционируем оба RDD с одинаковым партиционером
p = HashPartitioner(50)
orders_p = orders.partitionBy(p).persist()
customers_p = customers.partitionBy(p).persist()
orders_p.count() # Прогрев кеша
customers_p.count()
# Замер 2: join с co-partitioning (без shuffle)
start = time.time()
result2 = orders_p.join(customers_p) # Co-partitioned: нет ShuffleDependency
count2 = result2.count()
time2 = time.time() - start
print(f"Join с co-partitioning: {time2:.2f}s, {count2} records")
print(f"Ускорение: {time1/time2:.1f}x")
# Проверяем: второй join не создаёт ShuffledRDD
print("\nLineage result1:")
print(result1.toDebugString().decode()[:300])
print("\nLineage result2:")
print(result2.toDebugString().decode()[:300])
# В result2 НЕ должно быть ShuffledRDD
Ожидаемый результат: второй join в 2-4x быстрее (зависит от конфигурации). В lineage result2 не будет ShuffledRDD — только ZippedPartitionsRDD2 с narrow dependency.