Dynamic Partition Pruning (DPP)
Проблема: сканирование всей fact-таблицы
В star schema (звёздная схема) типичный запрос соединяет большую fact-таблицу с маленькими dimension-таблицами:
# fact-таблица: 500M строк, партиционирована по date_key
df_sales = spark.read.parquet("/data/sales/")
# dimension-таблица: 365 строк
df_dates = spark.read.parquet("/data/dim_dates/")
# Запрос: продажи за Q1 2024
result = (
df_sales
.join(df_dates, "date_key")
.filter(col("quarter") == "Q1-2024")
.groupBy("product_id")
.agg(sum("amount"))
)
Без DPP: Spark сначала сканирует все 500M строк из sales, затем выполняет join с dim_dates, и только потом фильтрует по quarter = 'Q1-2024'. Фильтр на quarter применяется к dimension-таблице, но Catalyst не может “пробросить” его через join к fact-таблице, потому что quarter — колонка dimension, а не fact.
Без DPP:
Scan sales: 500M rows (ALL partitions) → Join → Filter quarter=Q1 → Result
Scan dim_dates: 365 rows ↗
Сканировано: 500M строк
Реально нужно: ~125M строк (Q1 = 25% данных)
Что такое DPP?
Dynamic Partition Pruning решает эту проблему, внедряя runtime-фильтр из dimension-таблицы в scan fact-таблицы. DPP работает так:
- Spark сначала выполняет scan + filter dimension-таблицы (365 строк → 90 строк за Q1)
- Из результата извлекает список
date_keyзначений (90 ключей) - Внедряет этот список как фильтр в scan fact-таблицы
- Fact-таблица сканируется только по партициям, содержащим эти 90 date_key
С DPP:
Scan dim_dates: 365 rows → Filter quarter=Q1 → 90 date_keys
↓
DPP Filter (runtime)
↓
Scan sales: ONLY 90 partitions (125M rows) → Join → Result
Сканировано: 125M строк вместо 500M → 75% reduction!
В explain() DPP отображается как DynamicPruningExpression:
== Physical Plan ==
*(3) HashAggregate(keys=[product_id], functions=[sum(amount)])
+- *(3) BroadcastHashJoin [date_key], [date_key]
:- *(1) FileScan parquet [sales]
: PartitionFilters: [date_key IN (dynamicpruning#123)] # <-- DPP!
: DynamicPruningExpression: date_key IN (subquery#456)
+- BroadcastExchange
+- *(2) Filter (quarter = Q1-2024)
+- *(2) FileScan parquet [dim_dates]
Когда DPP работает
DPP активируется при выполнении всех условий:
- Fact-таблица партиционирована по join-ключу (или колонке, связанной с join-ключом)
- Join использует BroadcastExchange — dimension-таблица достаточно мала для broadcast
- Фильтр на dimension сужает количество ключей
spark.sql.optimizer.dynamicPartitionPruning.enabled = true(по умолчанию)
# DPP работает: fact партиционирована по date_key, dimension маленькая
df_sales = spark.read.parquet("/data/sales/") # partitioned by date_key
result = df_sales.join(df_dates, "date_key").filter(col("quarter") == "Q1")
# DPP НЕ работает: fact НЕ партиционирована
df_logs = spark.read.parquet("/data/logs_flat/") # no partitioning
result = df_logs.join(df_dates, "date_key").filter(col("quarter") == "Q1")
# Spark всё равно сканирует все файлы -- DPP бесполезен без партиционирования
DPP требует партиционирования! Если ваша fact-таблица не партиционирована по join-ключу, DPP не поможет. Это самая частая причина, по которой разработчики ожидают DPP, но не получают его. Проверьте: df.rdd.getNumPartitions() покажет число партиций, но для DPP важно именно file-level partitioning (как в Parquet/Hive partitioned tables).
Before/After: сокращение scan
Сценарий: star schema с fact-таблицей sales (500M строк, партиционирована по date_key, 365 партиций) и dimension-таблицей dim_dates (365 строк).
| Метрика | Без DPP | С DPP |
|---|---|---|
| Партиций просканировано | 365 | 90 (Q1) |
| Строк прочитано | 500M | 125M |
| Bytes scanned | 200 ГБ | 50 ГБ |
| Scan time | 120 сек | 30 сек |
| Ускорение scan | — | 4x |
Для более селективных фильтров (один месяц, один день) экономия ещё больше:
| Фильтр | Партиций | Scan reduction |
|---|---|---|
quarter = 'Q1' | 90 из 365 | 75% |
month = 'January' | 31 из 365 | 92% |
date_key = '2024-01-15' | 1 из 365 | 99.7% |
Конфигурация DPP
| Параметр | Значение | Назначение |
|---|---|---|
spark.sql.optimizer.dynamicPartitionPruning.enabled | true | Главный переключатель DPP |
spark.sql.optimizer.dynamicPartitionPruning.useStats | true | Использовать статистики для оценки эффективности DPP |
spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio | 0.5 | Если > 50% партиций остаётся — DPP может быть невыгоден |
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly | true | Использовать DPP только если broadcast уже есть (без дополнительного shuffle) |
# Проверка настроек DPP
spark.conf.get("spark.sql.optimizer.dynamicPartitionPruning.enabled") # true
# DPP включён по умолчанию -- обычно настройка не требуется
Анти-паттерн: ожидание DPP на непартиционированных таблицах
# Анти-паттерн: таблица не партиционирована
df_events = spark.read.parquet("/data/events/") # flat directory, no partitioning
df_users = spark.read.parquet("/data/dim_users/")
result = df_events.join(df_users, "user_id").filter(col("country") == "RU")
# DPP НЕ может пропустить файлы -- нет partition metadata для pruning
# Решение: перепартиционируйте fact-таблицу
df_events.write.partitionBy("user_country").parquet("/data/events_partitioned/")
# Теперь DPP сможет пропускать партиции по country
Правило: DPP — это data layout optimization. Если ваши данные не партиционированы по join-ключу, DPP физически не может пропустить файлы. Планируйте партиционирование при записи данных, а не при чтении.
DPP и AQE: взаимодействие
DPP — это compile-time оптимизация (часть Catalyst optimizer), а не runtime AQE. Но AQE усиливает DPP:
- AQE может менять join-стратегию на BroadcastHashJoin во время выполнения, включая DPP, который не был доступен в compile-time плане
- AQE собирает точные runtime статистики, которые DPP использует для оценки selectivity
- Результат: DPP + AQE вместе дают больше, чем каждый по отдельности
Что дальше?
В финальном уроке этого модуля мы разберём runtime фильтры и адаптацию плана — как AQE меняет стратегию join на лету, оптимизирует local shuffle reader и когда AQE всё-таки недостаточен.